Tune caves
[oweals/minetest.git] / src / connection.cpp
1 /*
2 Minetest-c55
3 Copyright (C) 2010 celeron55, Perttu Ahola <celeron55@gmail.com>
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License along
16 with this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20 #include "connection.h"
21 #include "main.h"
22 #include "serialization.h"
23 #include "log.h"
24 #include "porting.h"
25
26 namespace con
27 {
28
29 BufferedPacket makePacket(Address &address, u8 *data, u32 datasize,
30                 u32 protocol_id, u16 sender_peer_id, u8 channel)
31 {
32         u32 packet_size = datasize + BASE_HEADER_SIZE;
33         BufferedPacket p(packet_size);
34         p.address = address;
35
36         writeU32(&p.data[0], protocol_id);
37         writeU16(&p.data[4], sender_peer_id);
38         writeU8(&p.data[6], channel);
39
40         memcpy(&p.data[BASE_HEADER_SIZE], data, datasize);
41
42         return p;
43 }
44
45 BufferedPacket makePacket(Address &address, SharedBuffer<u8> &data,
46                 u32 protocol_id, u16 sender_peer_id, u8 channel)
47 {
48         return makePacket(address, *data, data.getSize(),
49                         protocol_id, sender_peer_id, channel);
50 }
51
52 SharedBuffer<u8> makeOriginalPacket(
53                 SharedBuffer<u8> data)
54 {
55         u32 header_size = 1;
56         u32 packet_size = data.getSize() + header_size;
57         SharedBuffer<u8> b(packet_size);
58
59         writeU8(&b[0], TYPE_ORIGINAL);
60
61         memcpy(&b[header_size], *data, data.getSize());
62
63         return b;
64 }
65
66 core::list<SharedBuffer<u8> > makeSplitPacket(
67                 SharedBuffer<u8> data,
68                 u32 chunksize_max,
69                 u16 seqnum)
70 {
71         // Chunk packets, containing the TYPE_SPLIT header
72         core::list<SharedBuffer<u8> > chunks;
73         
74         u32 chunk_header_size = 7;
75         u32 maximum_data_size = chunksize_max - chunk_header_size;
76         u32 start = 0;
77         u32 end = 0;
78         u32 chunk_num = 0;
79         do{
80                 end = start + maximum_data_size - 1;
81                 if(end > data.getSize() - 1)
82                         end = data.getSize() - 1;
83                 
84                 u32 payload_size = end - start + 1;
85                 u32 packet_size = chunk_header_size + payload_size;
86
87                 SharedBuffer<u8> chunk(packet_size);
88                 
89                 writeU8(&chunk[0], TYPE_SPLIT);
90                 writeU16(&chunk[1], seqnum);
91                 // [3] u16 chunk_count is written at next stage
92                 writeU16(&chunk[5], chunk_num);
93                 memcpy(&chunk[chunk_header_size], &data[start], payload_size);
94
95                 chunks.push_back(chunk);
96                 
97                 start = end + 1;
98                 chunk_num++;
99         }
100         while(end != data.getSize() - 1);
101
102         u16 chunk_count = chunks.getSize();
103
104         core::list<SharedBuffer<u8> >::Iterator i = chunks.begin();
105         for(; i != chunks.end(); i++)
106         {
107                 // Write chunk_count
108                 writeU16(&((*i)[3]), chunk_count);
109         }
110
111         return chunks;
112 }
113
114 core::list<SharedBuffer<u8> > makeAutoSplitPacket(
115                 SharedBuffer<u8> data,
116                 u32 chunksize_max,
117                 u16 &split_seqnum)
118 {
119         u32 original_header_size = 1;
120         core::list<SharedBuffer<u8> > list;
121         if(data.getSize() + original_header_size > chunksize_max)
122         {
123                 list = makeSplitPacket(data, chunksize_max, split_seqnum);
124                 split_seqnum++;
125                 return list;
126         }
127         else
128         {
129                 list.push_back(makeOriginalPacket(data));
130         }
131         return list;
132 }
133
134 SharedBuffer<u8> makeReliablePacket(
135                 SharedBuffer<u8> data,
136                 u16 seqnum)
137 {
138         /*dstream<<"BEGIN SharedBuffer<u8> makeReliablePacket()"<<std::endl;
139         dstream<<"data.getSize()="<<data.getSize()<<", data[0]="
140                         <<((unsigned int)data[0]&0xff)<<std::endl;*/
141         u32 header_size = 3;
142         u32 packet_size = data.getSize() + header_size;
143         SharedBuffer<u8> b(packet_size);
144
145         writeU8(&b[0], TYPE_RELIABLE);
146         writeU16(&b[1], seqnum);
147
148         memcpy(&b[header_size], *data, data.getSize());
149
150         /*dstream<<"data.getSize()="<<data.getSize()<<", data[0]="
151                         <<((unsigned int)data[0]&0xff)<<std::endl;*/
152         //dstream<<"END SharedBuffer<u8> makeReliablePacket()"<<std::endl;
153         return b;
154 }
155
156 /*
157         ReliablePacketBuffer
158 */
159
160 void ReliablePacketBuffer::print()
161 {
162         core::list<BufferedPacket>::Iterator i;
163         i = m_list.begin();
164         for(; i != m_list.end(); i++)
165         {
166                 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
167                 dout_con<<s<<" ";
168         }
169 }
170 bool ReliablePacketBuffer::empty()
171 {
172         return m_list.empty();
173 }
174 u32 ReliablePacketBuffer::size()
175 {
176         return m_list.getSize();
177 }
178 RPBSearchResult ReliablePacketBuffer::findPacket(u16 seqnum)
179 {
180         core::list<BufferedPacket>::Iterator i;
181         i = m_list.begin();
182         for(; i != m_list.end(); i++)
183         {
184                 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
185                 /*dout_con<<"findPacket(): finding seqnum="<<seqnum
186                                 <<", comparing to s="<<s<<std::endl;*/
187                 if(s == seqnum)
188                         break;
189         }
190         return i;
191 }
192 RPBSearchResult ReliablePacketBuffer::notFound()
193 {
194         return m_list.end();
195 }
196 u16 ReliablePacketBuffer::getFirstSeqnum()
197 {
198         if(empty())
199                 throw NotFoundException("Buffer is empty");
200         BufferedPacket p = *m_list.begin();
201         return readU16(&p.data[BASE_HEADER_SIZE+1]);
202 }
203 BufferedPacket ReliablePacketBuffer::popFirst()
204 {
205         if(empty())
206                 throw NotFoundException("Buffer is empty");
207         BufferedPacket p = *m_list.begin();
208         core::list<BufferedPacket>::Iterator i = m_list.begin();
209         m_list.erase(i);
210         return p;
211 }
212 BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum)
213 {
214         RPBSearchResult r = findPacket(seqnum);
215         if(r == notFound()){
216                 dout_con<<"Not found"<<std::endl;
217                 throw NotFoundException("seqnum not found in buffer");
218         }
219         BufferedPacket p = *r;
220         m_list.erase(r);
221         return p;
222 }
223 void ReliablePacketBuffer::insert(BufferedPacket &p)
224 {
225         assert(p.data.getSize() >= BASE_HEADER_SIZE+3);
226         u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
227         assert(type == TYPE_RELIABLE);
228         u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
229
230         // Find the right place for the packet and insert it there
231
232         // If list is empty, just add it
233         if(m_list.empty())
234         {
235                 m_list.push_back(p);
236                 // Done.
237                 return;
238         }
239         // Otherwise find the right place
240         core::list<BufferedPacket>::Iterator i;
241         i = m_list.begin();
242         // Find the first packet in the list which has a higher seqnum
243         for(; i != m_list.end(); i++){
244                 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
245                 if(s == seqnum){
246                         throw AlreadyExistsException("Same seqnum in list");
247                 }
248                 if(seqnum_higher(s, seqnum)){
249                         break;
250                 }
251         }
252         // If we're at the end of the list, add the packet to the
253         // end of the list
254         if(i == m_list.end())
255         {
256                 m_list.push_back(p);
257                 // Done.
258                 return;
259         }
260         // Insert before i
261         m_list.insert_before(i, p);
262 }
263
264 void ReliablePacketBuffer::incrementTimeouts(float dtime)
265 {
266         core::list<BufferedPacket>::Iterator i;
267         i = m_list.begin();
268         for(; i != m_list.end(); i++){
269                 i->time += dtime;
270                 i->totaltime += dtime;
271         }
272 }
273
274 void ReliablePacketBuffer::resetTimedOuts(float timeout)
275 {
276         core::list<BufferedPacket>::Iterator i;
277         i = m_list.begin();
278         for(; i != m_list.end(); i++){
279                 if(i->time >= timeout)
280                         i->time = 0.0;
281         }
282 }
283
284 bool ReliablePacketBuffer::anyTotaltimeReached(float timeout)
285 {
286         core::list<BufferedPacket>::Iterator i;
287         i = m_list.begin();
288         for(; i != m_list.end(); i++){
289                 if(i->totaltime >= timeout)
290                         return true;
291         }
292         return false;
293 }
294
295 core::list<BufferedPacket> ReliablePacketBuffer::getTimedOuts(float timeout)
296 {
297         core::list<BufferedPacket> timed_outs;
298         core::list<BufferedPacket>::Iterator i;
299         i = m_list.begin();
300         for(; i != m_list.end(); i++)
301         {
302                 if(i->time >= timeout)
303                         timed_outs.push_back(*i);
304         }
305         return timed_outs;
306 }
307
308 /*
309         IncomingSplitBuffer
310 */
311
312 IncomingSplitBuffer::~IncomingSplitBuffer()
313 {
314         core::map<u16, IncomingSplitPacket*>::Iterator i;
315         i = m_buf.getIterator();
316         for(; i.atEnd() == false; i++)
317         {
318                 delete i.getNode()->getValue();
319         }
320 }
321 /*
322         This will throw a GotSplitPacketException when a full
323         split packet is constructed.
324 */
325 SharedBuffer<u8> IncomingSplitBuffer::insert(BufferedPacket &p, bool reliable)
326 {
327         u32 headersize = BASE_HEADER_SIZE + 7;
328         assert(p.data.getSize() >= headersize);
329         u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
330         assert(type == TYPE_SPLIT);
331         u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
332         u16 chunk_count = readU16(&p.data[BASE_HEADER_SIZE+3]);
333         u16 chunk_num = readU16(&p.data[BASE_HEADER_SIZE+5]);
334
335         // Add if doesn't exist
336         if(m_buf.find(seqnum) == NULL)
337         {
338                 IncomingSplitPacket *sp = new IncomingSplitPacket();
339                 sp->chunk_count = chunk_count;
340                 sp->reliable = reliable;
341                 m_buf[seqnum] = sp;
342         }
343         
344         IncomingSplitPacket *sp = m_buf[seqnum];
345         
346         // TODO: These errors should be thrown or something? Dunno.
347         if(chunk_count != sp->chunk_count)
348                 derr_con<<"Connection: WARNING: chunk_count="<<chunk_count
349                                 <<" != sp->chunk_count="<<sp->chunk_count
350                                 <<std::endl;
351         if(reliable != sp->reliable)
352                 derr_con<<"Connection: WARNING: reliable="<<reliable
353                                 <<" != sp->reliable="<<sp->reliable
354                                 <<std::endl;
355
356         // If chunk already exists, cancel
357         if(sp->chunks.find(chunk_num) != NULL)
358                 throw AlreadyExistsException("Chunk already in buffer");
359         
360         // Cut chunk data out of packet
361         u32 chunkdatasize = p.data.getSize() - headersize;
362         SharedBuffer<u8> chunkdata(chunkdatasize);
363         memcpy(*chunkdata, &(p.data[headersize]), chunkdatasize);
364         
365         // Set chunk data in buffer
366         sp->chunks[chunk_num] = chunkdata;
367         
368         // If not all chunks are received, return empty buffer
369         if(sp->allReceived() == false)
370                 return SharedBuffer<u8>();
371
372         // Calculate total size
373         u32 totalsize = 0;
374         core::map<u16, SharedBuffer<u8> >::Iterator i;
375         i = sp->chunks.getIterator();
376         for(; i.atEnd() == false; i++)
377         {
378                 totalsize += i.getNode()->getValue().getSize();
379         }
380         
381         SharedBuffer<u8> fulldata(totalsize);
382
383         // Copy chunks to data buffer
384         u32 start = 0;
385         for(u32 chunk_i=0; chunk_i<sp->chunk_count;
386                         chunk_i++)
387         {
388                 SharedBuffer<u8> buf = sp->chunks[chunk_i];
389                 u16 chunkdatasize = buf.getSize();
390                 memcpy(&fulldata[start], *buf, chunkdatasize);
391                 start += chunkdatasize;;
392         }
393
394         // Remove sp from buffer
395         m_buf.remove(seqnum);
396         delete sp;
397
398         return fulldata;
399 }
400 void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout)
401 {
402         core::list<u16> remove_queue;
403         core::map<u16, IncomingSplitPacket*>::Iterator i;
404         i = m_buf.getIterator();
405         for(; i.atEnd() == false; i++)
406         {
407                 IncomingSplitPacket *p = i.getNode()->getValue();
408                 // Reliable ones are not removed by timeout
409                 if(p->reliable == true)
410                         continue;
411                 p->time += dtime;
412                 if(p->time >= timeout)
413                         remove_queue.push_back(i.getNode()->getKey());
414         }
415         core::list<u16>::Iterator j;
416         j = remove_queue.begin();
417         for(; j != remove_queue.end(); j++)
418         {
419                 dout_con<<"NOTE: Removing timed out unreliable split packet"
420                                 <<std::endl;
421                 delete m_buf[*j];
422                 m_buf.remove(*j);
423         }
424 }
425
426 /*
427         Channel
428 */
429
430 Channel::Channel()
431 {
432         next_outgoing_seqnum = SEQNUM_INITIAL;
433         next_incoming_seqnum = SEQNUM_INITIAL;
434         next_outgoing_split_seqnum = SEQNUM_INITIAL;
435 }
436 Channel::~Channel()
437 {
438 }
439
440 /*
441         Peer
442 */
443
444 Peer::Peer(u16 a_id, Address a_address):
445         address(a_address),
446         id(a_id),
447         timeout_counter(0.0),
448         ping_timer(0.0),
449         resend_timeout(0.5),
450         avg_rtt(-1.0),
451         has_sent_with_id(false),
452         m_sendtime_accu(0),
453         m_max_packets_per_second(10),
454         m_num_sent(0),
455         m_max_num_sent(0)
456 {
457 }
458 Peer::~Peer()
459 {
460 }
461
462 void Peer::reportRTT(float rtt)
463 {
464         if(rtt >= 0.0){
465                 if(rtt < 0.01){
466                         if(m_max_packets_per_second < 400)
467                                 m_max_packets_per_second += 10;
468                 } else if(rtt < 0.2){
469                         if(m_max_packets_per_second < 100)
470                                 m_max_packets_per_second += 2;
471                 } else {
472                         m_max_packets_per_second *= 0.8;
473                         if(m_max_packets_per_second < 10)
474                                 m_max_packets_per_second = 10;
475                 }
476         }
477
478         if(rtt < -0.999)
479         {}
480         else if(avg_rtt < 0.0)
481                 avg_rtt = rtt;
482         else
483                 avg_rtt = rtt * 0.1 + avg_rtt * 0.9;
484         
485         // Calculate resend_timeout
486
487         /*int reliable_count = 0;
488         for(int i=0; i<CHANNEL_COUNT; i++)
489         {
490                 reliable_count += channels[i].outgoing_reliables.size();
491         }
492         float timeout = avg_rtt * RESEND_TIMEOUT_FACTOR
493                         * ((float)reliable_count * 1);*/
494         
495         float timeout = avg_rtt * RESEND_TIMEOUT_FACTOR;
496         if(timeout < RESEND_TIMEOUT_MIN)
497                 timeout = RESEND_TIMEOUT_MIN;
498         if(timeout > RESEND_TIMEOUT_MAX)
499                 timeout = RESEND_TIMEOUT_MAX;
500         resend_timeout = timeout;
501 }
502                                 
503 /*
504         Connection
505 */
506
507 Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout):
508         m_protocol_id(protocol_id),
509         m_max_packet_size(max_packet_size),
510         m_timeout(timeout),
511         m_peer_id(0),
512         m_bc_peerhandler(NULL),
513         m_bc_receive_timeout(0),
514         m_indentation(0)
515 {
516         m_socket.setTimeoutMs(5);
517
518         Start();
519 }
520
521 Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout,
522                 PeerHandler *peerhandler):
523         m_protocol_id(protocol_id),
524         m_max_packet_size(max_packet_size),
525         m_timeout(timeout),
526         m_peer_id(0),
527         m_bc_peerhandler(peerhandler),
528         m_bc_receive_timeout(0),
529         m_indentation(0)
530 {
531         m_socket.setTimeoutMs(5);
532
533         Start();
534 }
535
536
537 Connection::~Connection()
538 {
539         stop();
540         // Delete peers
541         for(core::map<u16, Peer*>::Iterator
542                         j = m_peers.getIterator();
543                         j.atEnd() == false; j++)
544         {
545                 Peer *peer = j.getNode()->getValue();
546                 delete peer;
547         }
548 }
549
550 /* Internal stuff */
551
552 void * Connection::Thread()
553 {
554         ThreadStarted();
555         log_register_thread("Connection");
556
557         dout_con<<"Connection thread started"<<std::endl;
558         
559         u32 curtime = porting::getTimeMs();
560         u32 lasttime = curtime;
561
562         while(getRun())
563         {
564                 BEGIN_DEBUG_EXCEPTION_HANDLER
565                 
566                 lasttime = curtime;
567                 curtime = porting::getTimeMs();
568                 float dtime = (float)(curtime - lasttime) / 1000.;
569                 if(dtime > 0.1)
570                         dtime = 0.1;
571                 if(dtime < 0.0)
572                         dtime = 0.0;
573                 
574                 runTimeouts(dtime);
575
576                 while(m_command_queue.size() != 0){
577                         ConnectionCommand c = m_command_queue.pop_front();
578                         processCommand(c);
579                 }
580
581                 send(dtime);
582
583                 receive();
584                 
585                 END_DEBUG_EXCEPTION_HANDLER(derr_con);
586         }
587
588         return NULL;
589 }
590
591 void Connection::putEvent(ConnectionEvent &e)
592 {
593         assert(e.type != CONNEVENT_NONE);
594         m_event_queue.push_back(e);
595 }
596
597 void Connection::processCommand(ConnectionCommand &c)
598 {
599         switch(c.type){
600         case CONNCMD_NONE:
601                 dout_con<<getDesc()<<" processing CONNCMD_NONE"<<std::endl;
602                 return;
603         case CONNCMD_SERVE:
604                 dout_con<<getDesc()<<" processing CONNCMD_SERVE port="
605                                 <<c.port<<std::endl;
606                 serve(c.port);
607                 return;
608         case CONNCMD_CONNECT:
609                 dout_con<<getDesc()<<" processing CONNCMD_CONNECT"<<std::endl;
610                 connect(c.address);
611                 return;
612         case CONNCMD_DISCONNECT:
613                 dout_con<<getDesc()<<" processing CONNCMD_DISCONNECT"<<std::endl;
614                 disconnect();
615                 return;
616         case CONNCMD_SEND:
617                 dout_con<<getDesc()<<" processing CONNCMD_SEND"<<std::endl;
618                 send(c.peer_id, c.channelnum, c.data, c.reliable);
619                 return;
620         case CONNCMD_SEND_TO_ALL:
621                 dout_con<<getDesc()<<" processing CONNCMD_SEND_TO_ALL"<<std::endl;
622                 sendToAll(c.channelnum, c.data, c.reliable);
623                 return;
624         case CONNCMD_DELETE_PEER:
625                 dout_con<<getDesc()<<" processing CONNCMD_DELETE_PEER"<<std::endl;
626                 deletePeer(c.peer_id, false);
627                 return;
628         }
629 }
630
631 void Connection::send(float dtime)
632 {
633         for(core::map<u16, Peer*>::Iterator
634                         j = m_peers.getIterator();
635                         j.atEnd() == false; j++)
636         {
637                 Peer *peer = j.getNode()->getValue();
638                 peer->m_sendtime_accu += dtime;
639                 peer->m_num_sent = 0;
640                 peer->m_max_num_sent = peer->m_sendtime_accu *
641                                 peer->m_max_packets_per_second;
642         }
643         Queue<OutgoingPacket> postponed_packets;
644         while(m_outgoing_queue.size() != 0){
645                 OutgoingPacket packet = m_outgoing_queue.pop_front();
646                 Peer *peer = getPeerNoEx(packet.peer_id);
647                 if(!peer)
648                         continue;
649                 if(peer->channels[packet.channelnum].outgoing_reliables.size() >= 5){
650                         postponed_packets.push_back(packet);
651                 } else if(peer->m_num_sent < peer->m_max_num_sent){
652                         rawSendAsPacket(packet.peer_id, packet.channelnum,
653                                         packet.data, packet.reliable);
654                         peer->m_num_sent++;
655                 } else {
656                         postponed_packets.push_back(packet);
657                 }
658         }
659         while(postponed_packets.size() != 0){
660                 m_outgoing_queue.push_back(postponed_packets.pop_front());
661         }
662         for(core::map<u16, Peer*>::Iterator
663                         j = m_peers.getIterator();
664                         j.atEnd() == false; j++)
665         {
666                 Peer *peer = j.getNode()->getValue();
667                 peer->m_sendtime_accu -= (float)peer->m_num_sent /
668                                 peer->m_max_packets_per_second;
669                 if(peer->m_sendtime_accu > 10. / peer->m_max_packets_per_second)
670                         peer->m_sendtime_accu = 10. / peer->m_max_packets_per_second;
671         }
672 }
673
674 // Receive packets from the network and buffers and create ConnectionEvents
675 void Connection::receive()
676 {
677         u32 datasize = m_max_packet_size * 2;  // Double it just to be safe
678         // TODO: We can not know how many layers of header there are.
679         // For now, just assume there are no other than the base headers.
680         u32 packet_maxsize = datasize + BASE_HEADER_SIZE;
681         SharedBuffer<u8> packetdata(packet_maxsize);
682
683         bool single_wait_done = false;
684         
685         for(;;)
686         {
687         try{
688                 /* Check if some buffer has relevant data */
689                 {
690                         u16 peer_id;
691                         SharedBuffer<u8> resultdata;
692                         bool got = getFromBuffers(peer_id, resultdata);
693                         if(got){
694                                 ConnectionEvent e;
695                                 e.dataReceived(peer_id, resultdata);
696                                 putEvent(e);
697                                 continue;
698                         }
699                 }
700                 
701                 if(single_wait_done){
702                         if(m_socket.WaitData(0) == false)
703                                 break;
704                 }
705                 
706                 single_wait_done = true;
707
708                 Address sender;
709                 s32 received_size = m_socket.Receive(sender, *packetdata, packet_maxsize);
710
711                 if(received_size < 0)
712                         break;
713                 if(received_size < BASE_HEADER_SIZE)
714                         continue;
715                 if(readU32(&packetdata[0]) != m_protocol_id)
716                         continue;
717                 
718                 u16 peer_id = readPeerId(*packetdata);
719                 u8 channelnum = readChannel(*packetdata);
720                 if(channelnum > CHANNEL_COUNT-1){
721                         PrintInfo(derr_con);
722                         derr_con<<"Receive(): Invalid channel "<<channelnum<<std::endl;
723                         throw InvalidIncomingDataException("Channel doesn't exist");
724                 }
725
726                 if(peer_id == PEER_ID_INEXISTENT)
727                 {
728                         /*
729                                 Somebody is trying to send stuff to us with no peer id.
730                                 
731                                 Check if the same address and port was added to our peer
732                                 list before.
733                                 Allow only entries that have has_sent_with_id==false.
734                         */
735
736                         core::map<u16, Peer*>::Iterator j;
737                         j = m_peers.getIterator();
738                         for(; j.atEnd() == false; j++)
739                         {
740                                 Peer *peer = j.getNode()->getValue();
741                                 if(peer->has_sent_with_id)
742                                         continue;
743                                 if(peer->address == sender)
744                                         break;
745                         }
746                         
747                         /*
748                                 If no peer was found with the same address and port,
749                                 we shall assume it is a new peer and create an entry.
750                         */
751                         if(j.atEnd())
752                         {
753                                 // Pass on to adding the peer
754                         }
755                         // Else: A peer was found.
756                         else
757                         {
758                                 Peer *peer = j.getNode()->getValue();
759                                 peer_id = peer->id;
760                                 PrintInfo(derr_con);
761                                 derr_con<<"WARNING: Assuming unknown peer to be "
762                                                 <<"peer_id="<<peer_id<<std::endl;
763                         }
764                 }
765                 
766                 /*
767                         The peer was not found in our lists. Add it.
768                 */
769                 if(peer_id == PEER_ID_INEXISTENT)
770                 {
771                         // Somebody wants to make a new connection
772
773                         // Get a unique peer id (2 or higher)
774                         u16 peer_id_new = 2;
775                         /*
776                                 Find an unused peer id
777                         */
778                         bool out_of_ids = false;
779                         for(;;)
780                         {
781                                 // Check if exists
782                                 if(m_peers.find(peer_id_new) == NULL)
783                                         break;
784                                 // Check for overflow
785                                 if(peer_id_new == 65535){
786                                         out_of_ids = true;
787                                         break;
788                                 }
789                                 peer_id_new++;
790                         }
791                         if(out_of_ids){
792                                 errorstream<<getDesc()<<" ran out of peer ids"<<std::endl;
793                                 continue;
794                         }
795
796                         PrintInfo();
797                         dout_con<<"Receive(): Got a packet with peer_id=PEER_ID_INEXISTENT,"
798                                         " giving peer_id="<<peer_id_new<<std::endl;
799
800                         // Create a peer
801                         Peer *peer = new Peer(peer_id_new, sender);
802                         m_peers.insert(peer->id, peer);
803                         
804                         // Create peer addition event
805                         ConnectionEvent e;
806                         e.peerAdded(peer_id_new, sender);
807                         putEvent(e);
808                         
809                         // Create CONTROL packet to tell the peer id to the new peer.
810                         SharedBuffer<u8> reply(4);
811                         writeU8(&reply[0], TYPE_CONTROL);
812                         writeU8(&reply[1], CONTROLTYPE_SET_PEER_ID);
813                         writeU16(&reply[2], peer_id_new);
814                         sendAsPacket(peer_id_new, 0, reply, true);
815                         
816                         // We're now talking to a valid peer_id
817                         peer_id = peer_id_new;
818
819                         // Go on and process whatever it sent
820                 }
821
822                 core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
823
824                 if(node == NULL)
825                 {
826                         // Peer not found
827                         // This means that the peer id of the sender is not PEER_ID_INEXISTENT
828                         // and it is invalid.
829                         PrintInfo(derr_con);
830                         derr_con<<"Receive(): Peer not found"<<std::endl;
831                         throw InvalidIncomingDataException("Peer not found (possible timeout)");
832                 }
833
834                 Peer *peer = node->getValue();
835
836                 // Validate peer address
837                 if(peer->address != sender)
838                 {
839                         PrintInfo(derr_con);
840                         derr_con<<"Peer "<<peer_id<<" sending from different address."
841                                         " Ignoring."<<std::endl;
842                         continue;
843                 }
844                 
845                 peer->timeout_counter = 0.0;
846
847                 Channel *channel = &(peer->channels[channelnum]);
848                 
849                 // Throw the received packet to channel->processPacket()
850
851                 // Make a new SharedBuffer from the data without the base headers
852                 SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE);
853                 memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE],
854                                 strippeddata.getSize());
855                 
856                 try{
857                         // Process it (the result is some data with no headers made by us)
858                         SharedBuffer<u8> resultdata = processPacket
859                                         (channel, strippeddata, peer_id, channelnum, false);
860                         
861                         PrintInfo();
862                         dout_con<<"ProcessPacket returned data of size "
863                                         <<resultdata.getSize()<<std::endl;
864                         
865                         ConnectionEvent e;
866                         e.dataReceived(peer_id, resultdata);
867                         putEvent(e);
868                         continue;
869                 }catch(ProcessedSilentlyException &e){
870                 }
871         }catch(InvalidIncomingDataException &e){
872         }
873         catch(ProcessedSilentlyException &e){
874         }
875         } // for
876 }
877
878 void Connection::runTimeouts(float dtime)
879 {
880         core::list<u16> timeouted_peers;
881         core::map<u16, Peer*>::Iterator j;
882         j = m_peers.getIterator();
883         for(; j.atEnd() == false; j++)
884         {
885                 Peer *peer = j.getNode()->getValue();
886                 
887                 /*
888                         Check peer timeout
889                 */
890                 peer->timeout_counter += dtime;
891                 if(peer->timeout_counter > m_timeout)
892                 {
893                         PrintInfo(derr_con);
894                         derr_con<<"RunTimeouts(): Peer "<<peer->id
895                                         <<" has timed out."
896                                         <<" (source=peer->timeout_counter)"
897                                         <<std::endl;
898                         // Add peer to the list
899                         timeouted_peers.push_back(peer->id);
900                         // Don't bother going through the buffers of this one
901                         continue;
902                 }
903
904                 float resend_timeout = peer->resend_timeout;
905                 for(u16 i=0; i<CHANNEL_COUNT; i++)
906                 {
907                         core::list<BufferedPacket> timed_outs;
908                         core::list<BufferedPacket>::Iterator j;
909                         
910                         Channel *channel = &peer->channels[i];
911
912                         // Remove timed out incomplete unreliable split packets
913                         channel->incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout);
914                         
915                         // Increment reliable packet times
916                         channel->outgoing_reliables.incrementTimeouts(dtime);
917
918                         // Check reliable packet total times, remove peer if
919                         // over timeout.
920                         if(channel->outgoing_reliables.anyTotaltimeReached(m_timeout))
921                         {
922                                 PrintInfo(derr_con);
923                                 derr_con<<"RunTimeouts(): Peer "<<peer->id
924                                                 <<" has timed out."
925                                                 <<" (source=reliable packet totaltime)"
926                                                 <<std::endl;
927                                 // Add peer to the to-be-removed list
928                                 timeouted_peers.push_back(peer->id);
929                                 goto nextpeer;
930                         }
931
932                         // Re-send timed out outgoing reliables
933                         
934                         timed_outs = channel->
935                                         outgoing_reliables.getTimedOuts(resend_timeout);
936
937                         channel->outgoing_reliables.resetTimedOuts(resend_timeout);
938
939                         j = timed_outs.begin();
940                         for(; j != timed_outs.end(); j++)
941                         {
942                                 u16 peer_id = readPeerId(*(j->data));
943                                 u8 channel = readChannel(*(j->data));
944                                 u16 seqnum = readU16(&(j->data[BASE_HEADER_SIZE+1]));
945
946                                 PrintInfo(derr_con);
947                                 derr_con<<"RE-SENDING timed-out RELIABLE to ";
948                                 j->address.print(&derr_con);
949                                 derr_con<<"(t/o="<<resend_timeout<<"): "
950                                                 <<"from_peer_id="<<peer_id
951                                                 <<", channel="<<((int)channel&0xff)
952                                                 <<", seqnum="<<seqnum
953                                                 <<std::endl;
954
955                                 rawSend(*j);
956
957                                 // Enlarge avg_rtt and resend_timeout:
958                                 // The rtt will be at least the timeout.
959                                 // NOTE: This won't affect the timeout of the next
960                                 // checked channel because it was cached.
961                                 peer->reportRTT(resend_timeout);
962                         }
963                 }
964                 
965                 /*
966                         Send pings
967                 */
968                 peer->ping_timer += dtime;
969                 if(peer->ping_timer >= 5.0)
970                 {
971                         // Create and send PING packet
972                         SharedBuffer<u8> data(2);
973                         writeU8(&data[0], TYPE_CONTROL);
974                         writeU8(&data[1], CONTROLTYPE_PING);
975                         rawSendAsPacket(peer->id, 0, data, true);
976
977                         peer->ping_timer = 0.0;
978                 }
979                 
980 nextpeer:
981                 continue;
982         }
983
984         // Remove timed out peers
985         core::list<u16>::Iterator i = timeouted_peers.begin();
986         for(; i != timeouted_peers.end(); i++)
987         {
988                 PrintInfo(derr_con);
989                 derr_con<<"RunTimeouts(): Removing peer "<<(*i)<<std::endl;
990                 deletePeer(*i, true);
991         }
992 }
993
994 void Connection::serve(u16 port)
995 {
996         dout_con<<getDesc()<<" serving at port "<<port<<std::endl;
997         try{
998                 m_socket.Bind(port);
999                 m_peer_id = PEER_ID_SERVER;
1000         }
1001         catch(SocketException &e){
1002                 // Create event
1003                 ConnectionEvent ce;
1004                 ce.bindFailed();
1005                 putEvent(ce);
1006         }
1007 }
1008
1009 void Connection::connect(Address address)
1010 {
1011         dout_con<<getDesc()<<" connecting to "<<address.serializeString()
1012                         <<":"<<address.getPort()<<std::endl;
1013
1014         core::map<u16, Peer*>::Node *node = m_peers.find(PEER_ID_SERVER);
1015         if(node != NULL){
1016                 throw ConnectionException("Already connected to a server");
1017         }
1018
1019         Peer *peer = new Peer(PEER_ID_SERVER, address);
1020         m_peers.insert(peer->id, peer);
1021
1022         // Create event
1023         ConnectionEvent e;
1024         e.peerAdded(peer->id, peer->address);
1025         putEvent(e);
1026         
1027         m_socket.Bind(0);
1028         
1029         // Send a dummy packet to server with peer_id = PEER_ID_INEXISTENT
1030         m_peer_id = PEER_ID_INEXISTENT;
1031         SharedBuffer<u8> data(0);
1032         Send(PEER_ID_SERVER, 0, data, true);
1033 }
1034
1035 void Connection::disconnect()
1036 {
1037         dout_con<<getDesc()<<" disconnecting"<<std::endl;
1038
1039         // Create and send DISCO packet
1040         SharedBuffer<u8> data(2);
1041         writeU8(&data[0], TYPE_CONTROL);
1042         writeU8(&data[1], CONTROLTYPE_DISCO);
1043         
1044         // Send to all
1045         core::map<u16, Peer*>::Iterator j;
1046         j = m_peers.getIterator();
1047         for(; j.atEnd() == false; j++)
1048         {
1049                 Peer *peer = j.getNode()->getValue();
1050                 rawSendAsPacket(peer->id, 0, data, false);
1051         }
1052 }
1053
1054 void Connection::sendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable)
1055 {
1056         core::map<u16, Peer*>::Iterator j;
1057         j = m_peers.getIterator();
1058         for(; j.atEnd() == false; j++)
1059         {
1060                 Peer *peer = j.getNode()->getValue();
1061                 send(peer->id, channelnum, data, reliable);
1062         }
1063 }
1064
1065 void Connection::send(u16 peer_id, u8 channelnum,
1066                 SharedBuffer<u8> data, bool reliable)
1067 {
1068         dout_con<<getDesc()<<" sending to peer_id="<<peer_id<<std::endl;
1069
1070         assert(channelnum < CHANNEL_COUNT);
1071         
1072         Peer *peer = getPeerNoEx(peer_id);
1073         if(peer == NULL)
1074                 return;
1075         Channel *channel = &(peer->channels[channelnum]);
1076
1077         u32 chunksize_max = m_max_packet_size - BASE_HEADER_SIZE;
1078         if(reliable)
1079                 chunksize_max -= RELIABLE_HEADER_SIZE;
1080
1081         core::list<SharedBuffer<u8> > originals;
1082         originals = makeAutoSplitPacket(data, chunksize_max,
1083                         channel->next_outgoing_split_seqnum);
1084         
1085         core::list<SharedBuffer<u8> >::Iterator i;
1086         i = originals.begin();
1087         for(; i != originals.end(); i++)
1088         {
1089                 SharedBuffer<u8> original = *i;
1090                 
1091                 sendAsPacket(peer_id, channelnum, original, reliable);
1092         }
1093 }
1094
1095 void Connection::sendAsPacket(u16 peer_id, u8 channelnum,
1096                 SharedBuffer<u8> data, bool reliable)
1097 {
1098         OutgoingPacket packet(peer_id, channelnum, data, reliable);
1099         m_outgoing_queue.push_back(packet);
1100 }
1101
1102 void Connection::rawSendAsPacket(u16 peer_id, u8 channelnum,
1103                 SharedBuffer<u8> data, bool reliable)
1104 {
1105         Peer *peer = getPeerNoEx(peer_id);
1106         if(!peer)
1107                 return;
1108         Channel *channel = &(peer->channels[channelnum]);
1109
1110         if(reliable)
1111         {
1112                 u16 seqnum = channel->next_outgoing_seqnum;
1113                 channel->next_outgoing_seqnum++;
1114
1115                 SharedBuffer<u8> reliable = makeReliablePacket(data, seqnum);
1116
1117                 // Add base headers and make a packet
1118                 BufferedPacket p = makePacket(peer->address, reliable,
1119                                 m_protocol_id, m_peer_id, channelnum);
1120                 
1121                 try{
1122                         // Buffer the packet
1123                         channel->outgoing_reliables.insert(p);
1124                 }
1125                 catch(AlreadyExistsException &e)
1126                 {
1127                         PrintInfo(derr_con);
1128                         derr_con<<"WARNING: Going to send a reliable packet "
1129                                         "seqnum="<<seqnum<<" that is already "
1130                                         "in outgoing buffer"<<std::endl;
1131                         //assert(0);
1132                 }
1133                 
1134                 // Send the packet
1135                 rawSend(p);
1136         }
1137         else
1138         {
1139                 // Add base headers and make a packet
1140                 BufferedPacket p = makePacket(peer->address, data,
1141                                 m_protocol_id, m_peer_id, channelnum);
1142
1143                 // Send the packet
1144                 rawSend(p);
1145         }
1146 }
1147
1148 void Connection::rawSend(const BufferedPacket &packet)
1149 {
1150         try{
1151                 m_socket.Send(packet.address, *packet.data, packet.data.getSize());
1152         } catch(SendFailedException &e){
1153                 derr_con<<"Connection::rawSend(): SendFailedException: "
1154                                 <<packet.address.serializeString()<<std::endl;
1155         }
1156 }
1157
1158 Peer* Connection::getPeer(u16 peer_id)
1159 {
1160         core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
1161
1162         if(node == NULL){
1163                 throw PeerNotFoundException("GetPeer: Peer not found (possible timeout)");
1164         }
1165
1166         // Error checking
1167         assert(node->getValue()->id == peer_id);
1168
1169         return node->getValue();
1170 }
1171
1172 Peer* Connection::getPeerNoEx(u16 peer_id)
1173 {
1174         core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
1175
1176         if(node == NULL){
1177                 return NULL;
1178         }
1179
1180         // Error checking
1181         assert(node->getValue()->id == peer_id);
1182
1183         return node->getValue();
1184 }
1185
1186 core::list<Peer*> Connection::getPeers()
1187 {
1188         core::list<Peer*> list;
1189         core::map<u16, Peer*>::Iterator j;
1190         j = m_peers.getIterator();
1191         for(; j.atEnd() == false; j++)
1192         {
1193                 Peer *peer = j.getNode()->getValue();
1194                 list.push_back(peer);
1195         }
1196         return list;
1197 }
1198
1199 bool Connection::getFromBuffers(u16 &peer_id, SharedBuffer<u8> &dst)
1200 {
1201         core::map<u16, Peer*>::Iterator j;
1202         j = m_peers.getIterator();
1203         for(; j.atEnd() == false; j++)
1204         {
1205                 Peer *peer = j.getNode()->getValue();
1206                 for(u16 i=0; i<CHANNEL_COUNT; i++)
1207                 {
1208                         Channel *channel = &peer->channels[i];
1209                         SharedBuffer<u8> resultdata;
1210                         bool got = checkIncomingBuffers(channel, peer_id, resultdata);
1211                         if(got){
1212                                 dst = resultdata;
1213                                 return true;
1214                         }
1215                 }
1216         }
1217         return false;
1218 }
1219
1220 bool Connection::checkIncomingBuffers(Channel *channel, u16 &peer_id,
1221                 SharedBuffer<u8> &dst)
1222 {
1223         u16 firstseqnum = 0;
1224         // Clear old packets from start of buffer
1225         try{
1226         for(;;){
1227                 firstseqnum = channel->incoming_reliables.getFirstSeqnum();
1228                 if(seqnum_higher(channel->next_incoming_seqnum, firstseqnum))
1229                         channel->incoming_reliables.popFirst();
1230                 else
1231                         break;
1232         }
1233         // This happens if all packets are old
1234         }catch(con::NotFoundException)
1235         {}
1236         
1237         if(channel->incoming_reliables.empty() == false)
1238         {
1239                 if(firstseqnum == channel->next_incoming_seqnum)
1240                 {
1241                         BufferedPacket p = channel->incoming_reliables.popFirst();
1242                         
1243                         peer_id = readPeerId(*p.data);
1244                         u8 channelnum = readChannel(*p.data);
1245                         u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
1246
1247                         PrintInfo();
1248                         dout_con<<"UNBUFFERING TYPE_RELIABLE"
1249                                         <<" seqnum="<<seqnum
1250                                         <<" peer_id="<<peer_id
1251                                         <<" channel="<<((int)channelnum&0xff)
1252                                         <<std::endl;
1253
1254                         channel->next_incoming_seqnum++;
1255                         
1256                         u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
1257                         // Get out the inside packet and re-process it
1258                         SharedBuffer<u8> payload(p.data.getSize() - headers_size);
1259                         memcpy(*payload, &p.data[headers_size], payload.getSize());
1260
1261                         dst = processPacket(channel, payload, peer_id, channelnum, true);
1262                         return true;
1263                 }
1264         }
1265         return false;
1266 }
1267
1268 SharedBuffer<u8> Connection::processPacket(Channel *channel,
1269                 SharedBuffer<u8> packetdata, u16 peer_id,
1270                 u8 channelnum, bool reliable)
1271 {
1272         IndentationRaiser iraiser(&(m_indentation));
1273
1274         if(packetdata.getSize() < 1)
1275                 throw InvalidIncomingDataException("packetdata.getSize() < 1");
1276
1277         u8 type = readU8(&packetdata[0]);
1278         
1279         if(type == TYPE_CONTROL)
1280         {
1281                 if(packetdata.getSize() < 2)
1282                         throw InvalidIncomingDataException("packetdata.getSize() < 2");
1283
1284                 u8 controltype = readU8(&packetdata[1]);
1285
1286                 if(controltype == CONTROLTYPE_ACK)
1287                 {
1288                         if(packetdata.getSize() < 4)
1289                                 throw InvalidIncomingDataException
1290                                                 ("packetdata.getSize() < 4 (ACK header size)");
1291
1292                         u16 seqnum = readU16(&packetdata[2]);
1293                         PrintInfo();
1294                         dout_con<<"Got CONTROLTYPE_ACK: channelnum="
1295                                         <<((int)channelnum&0xff)<<", peer_id="<<peer_id
1296                                         <<", seqnum="<<seqnum<<std::endl;
1297
1298                         try{
1299                                 BufferedPacket p = channel->outgoing_reliables.popSeqnum(seqnum);
1300                                 // Get round trip time
1301                                 float rtt = p.totaltime;
1302
1303                                 // Let peer calculate stuff according to it
1304                                 // (avg_rtt and resend_timeout)
1305                                 Peer *peer = getPeer(peer_id);
1306                                 peer->reportRTT(rtt);
1307
1308                                 //PrintInfo(dout_con);
1309                                 //dout_con<<"RTT = "<<rtt<<std::endl;
1310
1311                                 /*dout_con<<"OUTGOING: ";
1312                                 PrintInfo();
1313                                 channel->outgoing_reliables.print();
1314                                 dout_con<<std::endl;*/
1315                         }
1316                         catch(NotFoundException &e){
1317                                 PrintInfo(derr_con);
1318                                 derr_con<<"WARNING: ACKed packet not "
1319                                                 "in outgoing queue"
1320                                                 <<std::endl;
1321                         }
1322
1323                         throw ProcessedSilentlyException("Got an ACK");
1324                 }
1325                 else if(controltype == CONTROLTYPE_SET_PEER_ID)
1326                 {
1327                         if(packetdata.getSize() < 4)
1328                                 throw InvalidIncomingDataException
1329                                                 ("packetdata.getSize() < 4 (SET_PEER_ID header size)");
1330                         u16 peer_id_new = readU16(&packetdata[2]);
1331                         PrintInfo();
1332                         dout_con<<"Got new peer id: "<<peer_id_new<<"... "<<std::endl;
1333
1334                         if(GetPeerID() != PEER_ID_INEXISTENT)
1335                         {
1336                                 PrintInfo(derr_con);
1337                                 derr_con<<"WARNING: Not changing"
1338                                                 " existing peer id."<<std::endl;
1339                         }
1340                         else
1341                         {
1342                                 dout_con<<"changing."<<std::endl;
1343                                 SetPeerID(peer_id_new);
1344                         }
1345                         throw ProcessedSilentlyException("Got a SET_PEER_ID");
1346                 }
1347                 else if(controltype == CONTROLTYPE_PING)
1348                 {
1349                         // Just ignore it, the incoming data already reset
1350                         // the timeout counter
1351                         PrintInfo();
1352                         dout_con<<"PING"<<std::endl;
1353                         throw ProcessedSilentlyException("Got a PING");
1354                 }
1355                 else if(controltype == CONTROLTYPE_DISCO)
1356                 {
1357                         // Just ignore it, the incoming data already reset
1358                         // the timeout counter
1359                         PrintInfo();
1360                         dout_con<<"DISCO: Removing peer "<<(peer_id)<<std::endl;
1361                         
1362                         if(deletePeer(peer_id, false) == false)
1363                         {
1364                                 PrintInfo(derr_con);
1365                                 derr_con<<"DISCO: Peer not found"<<std::endl;
1366                         }
1367
1368                         throw ProcessedSilentlyException("Got a DISCO");
1369                 }
1370                 else{
1371                         PrintInfo(derr_con);
1372                         derr_con<<"INVALID TYPE_CONTROL: invalid controltype="
1373                                         <<((int)controltype&0xff)<<std::endl;
1374                         throw InvalidIncomingDataException("Invalid control type");
1375                 }
1376         }
1377         else if(type == TYPE_ORIGINAL)
1378         {
1379                 if(packetdata.getSize() < ORIGINAL_HEADER_SIZE)
1380                         throw InvalidIncomingDataException
1381                                         ("packetdata.getSize() < ORIGINAL_HEADER_SIZE");
1382                 PrintInfo();
1383                 dout_con<<"RETURNING TYPE_ORIGINAL to user"
1384                                 <<std::endl;
1385                 // Get the inside packet out and return it
1386                 SharedBuffer<u8> payload(packetdata.getSize() - ORIGINAL_HEADER_SIZE);
1387                 memcpy(*payload, &packetdata[ORIGINAL_HEADER_SIZE], payload.getSize());
1388                 return payload;
1389         }
1390         else if(type == TYPE_SPLIT)
1391         {
1392                 // We have to create a packet again for buffering
1393                 // This isn't actually too bad an idea.
1394                 BufferedPacket packet = makePacket(
1395                                 getPeer(peer_id)->address,
1396                                 packetdata,
1397                                 GetProtocolID(),
1398                                 peer_id,
1399                                 channelnum);
1400                 // Buffer the packet
1401                 SharedBuffer<u8> data = channel->incoming_splits.insert(packet, reliable);
1402                 if(data.getSize() != 0)
1403                 {
1404                         PrintInfo();
1405                         dout_con<<"RETURNING TYPE_SPLIT: Constructed full data, "
1406                                         <<"size="<<data.getSize()<<std::endl;
1407                         return data;
1408                 }
1409                 PrintInfo();
1410                 dout_con<<"BUFFERED TYPE_SPLIT"<<std::endl;
1411                 throw ProcessedSilentlyException("Buffered a split packet chunk");
1412         }
1413         else if(type == TYPE_RELIABLE)
1414         {
1415                 // Recursive reliable packets not allowed
1416                 assert(reliable == false);
1417
1418                 if(packetdata.getSize() < RELIABLE_HEADER_SIZE)
1419                         throw InvalidIncomingDataException
1420                                         ("packetdata.getSize() < RELIABLE_HEADER_SIZE");
1421
1422                 u16 seqnum = readU16(&packetdata[1]);
1423
1424                 bool is_future_packet = seqnum_higher(seqnum, channel->next_incoming_seqnum);
1425                 bool is_old_packet = seqnum_higher(channel->next_incoming_seqnum, seqnum);
1426                 
1427                 PrintInfo();
1428                 if(is_future_packet)
1429                         dout_con<<"BUFFERING";
1430                 else if(is_old_packet)
1431                         dout_con<<"OLD";
1432                 else
1433                         dout_con<<"RECUR";
1434                 dout_con<<" TYPE_RELIABLE seqnum="<<seqnum
1435                                 <<" next="<<channel->next_incoming_seqnum;
1436                 dout_con<<" [sending CONTROLTYPE_ACK"
1437                                 " to peer_id="<<peer_id<<"]";
1438                 dout_con<<std::endl;
1439                 
1440                 //DEBUG
1441                 //assert(channel->incoming_reliables.size() < 100);
1442
1443                 // Send a CONTROLTYPE_ACK
1444                 SharedBuffer<u8> reply(4);
1445                 writeU8(&reply[0], TYPE_CONTROL);
1446                 writeU8(&reply[1], CONTROLTYPE_ACK);
1447                 writeU16(&reply[2], seqnum);
1448                 rawSendAsPacket(peer_id, channelnum, reply, false);
1449
1450                 //if(seqnum_higher(seqnum, channel->next_incoming_seqnum))
1451                 if(is_future_packet)
1452                 {
1453                         /*PrintInfo();
1454                         dout_con<<"Buffering reliable packet (seqnum="
1455                                         <<seqnum<<")"<<std::endl;*/
1456                         
1457                         // This one comes later, buffer it.
1458                         // Actually we have to make a packet to buffer one.
1459                         // Well, we have all the ingredients, so just do it.
1460                         BufferedPacket packet = makePacket(
1461                                         getPeer(peer_id)->address,
1462                                         packetdata,
1463                                         GetProtocolID(),
1464                                         peer_id,
1465                                         channelnum);
1466                         try{
1467                                 channel->incoming_reliables.insert(packet);
1468                                 
1469                                 /*PrintInfo();
1470                                 dout_con<<"INCOMING: ";
1471                                 channel->incoming_reliables.print();
1472                                 dout_con<<std::endl;*/
1473                         }
1474                         catch(AlreadyExistsException &e)
1475                         {
1476                         }
1477
1478                         throw ProcessedSilentlyException("Buffered future reliable packet");
1479                 }
1480                 //else if(seqnum_higher(channel->next_incoming_seqnum, seqnum))
1481                 else if(is_old_packet)
1482                 {
1483                         // An old packet, dump it
1484                         throw InvalidIncomingDataException("Got an old reliable packet");
1485                 }
1486
1487                 channel->next_incoming_seqnum++;
1488
1489                 // Get out the inside packet and re-process it
1490                 SharedBuffer<u8> payload(packetdata.getSize() - RELIABLE_HEADER_SIZE);
1491                 memcpy(*payload, &packetdata[RELIABLE_HEADER_SIZE], payload.getSize());
1492
1493                 return processPacket(channel, payload, peer_id, channelnum, true);
1494         }
1495         else
1496         {
1497                 PrintInfo(derr_con);
1498                 derr_con<<"Got invalid type="<<((int)type&0xff)<<std::endl;
1499                 throw InvalidIncomingDataException("Invalid packet type");
1500         }
1501         
1502         // We should never get here.
1503         // If you get here, add an exception or a return to some of the
1504         // above conditionals.
1505         assert(0);
1506         throw BaseException("Error in Channel::ProcessPacket()");
1507 }
1508
1509 bool Connection::deletePeer(u16 peer_id, bool timeout)
1510 {
1511         if(m_peers.find(peer_id) == NULL)
1512                 return false;
1513         
1514         Peer *peer = m_peers[peer_id];
1515
1516         // Create event
1517         ConnectionEvent e;
1518         e.peerRemoved(peer_id, timeout, peer->address);
1519         putEvent(e);
1520
1521         delete m_peers[peer_id];
1522         m_peers.remove(peer_id);
1523         return true;
1524 }
1525
1526 /* Interface */
1527
1528 ConnectionEvent Connection::getEvent()
1529 {
1530         if(m_event_queue.size() == 0){
1531                 ConnectionEvent e;
1532                 e.type = CONNEVENT_NONE;
1533                 return e;
1534         }
1535         return m_event_queue.pop_front();
1536 }
1537
1538 ConnectionEvent Connection::waitEvent(u32 timeout_ms)
1539 {
1540         try{
1541                 return m_event_queue.pop_front(timeout_ms);
1542         } catch(ItemNotFoundException &ex){
1543                 ConnectionEvent e;
1544                 e.type = CONNEVENT_NONE;
1545                 return e;
1546         }
1547 }
1548
1549 void Connection::putCommand(ConnectionCommand &c)
1550 {
1551         m_command_queue.push_back(c);
1552 }
1553
1554 void Connection::Serve(unsigned short port)
1555 {
1556         ConnectionCommand c;
1557         c.serve(port);
1558         putCommand(c);
1559 }
1560
1561 void Connection::Connect(Address address)
1562 {
1563         ConnectionCommand c;
1564         c.connect(address);
1565         putCommand(c);
1566 }
1567
1568 bool Connection::Connected()
1569 {
1570         JMutexAutoLock peerlock(m_peers_mutex);
1571
1572         if(m_peers.size() != 1)
1573                 return false;
1574                 
1575         core::map<u16, Peer*>::Node *node = m_peers.find(PEER_ID_SERVER);
1576         if(node == NULL)
1577                 return false;
1578         
1579         if(m_peer_id == PEER_ID_INEXISTENT)
1580                 return false;
1581         
1582         return true;
1583 }
1584
1585 void Connection::Disconnect()
1586 {
1587         ConnectionCommand c;
1588         c.disconnect();
1589         putCommand(c);
1590 }
1591
1592 u32 Connection::Receive(u16 &peer_id, SharedBuffer<u8> &data)
1593 {
1594         for(;;){
1595                 ConnectionEvent e = waitEvent(m_bc_receive_timeout);
1596                 if(e.type != CONNEVENT_NONE)
1597                         dout_con<<getDesc()<<": Receive: got event: "
1598                                         <<e.describe()<<std::endl;
1599                 switch(e.type){
1600                 case CONNEVENT_NONE:
1601                         throw NoIncomingDataException("No incoming data");
1602                 case CONNEVENT_DATA_RECEIVED:
1603                         peer_id = e.peer_id;
1604                         data = SharedBuffer<u8>(e.data);
1605                         return e.data.getSize();
1606                 case CONNEVENT_PEER_ADDED: {
1607                         Peer tmp(e.peer_id, e.address);
1608                         if(m_bc_peerhandler)
1609                                 m_bc_peerhandler->peerAdded(&tmp);
1610                         continue; }
1611                 case CONNEVENT_PEER_REMOVED: {
1612                         Peer tmp(e.peer_id, e.address);
1613                         if(m_bc_peerhandler)
1614                                 m_bc_peerhandler->deletingPeer(&tmp, e.timeout);
1615                         continue; }
1616                 case CONNEVENT_BIND_FAILED:
1617                         throw ConnectionBindFailed("Failed to bind socket "
1618                                         "(port already in use?)");
1619                 }
1620         }
1621         throw NoIncomingDataException("No incoming data");
1622 }
1623
1624 void Connection::SendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable)
1625 {
1626         assert(channelnum < CHANNEL_COUNT);
1627
1628         ConnectionCommand c;
1629         c.sendToAll(channelnum, data, reliable);
1630         putCommand(c);
1631 }
1632
1633 void Connection::Send(u16 peer_id, u8 channelnum,
1634                 SharedBuffer<u8> data, bool reliable)
1635 {
1636         assert(channelnum < CHANNEL_COUNT);
1637
1638         ConnectionCommand c;
1639         c.send(peer_id, channelnum, data, reliable);
1640         putCommand(c);
1641 }
1642
1643 void Connection::RunTimeouts(float dtime)
1644 {
1645         // No-op
1646 }
1647
1648 Address Connection::GetPeerAddress(u16 peer_id)
1649 {
1650         JMutexAutoLock peerlock(m_peers_mutex);
1651         return getPeer(peer_id)->address;
1652 }
1653
1654 float Connection::GetPeerAvgRTT(u16 peer_id)
1655 {
1656         JMutexAutoLock peerlock(m_peers_mutex);
1657         return getPeer(peer_id)->avg_rtt;
1658 }
1659
1660 void Connection::DeletePeer(u16 peer_id)
1661 {
1662         ConnectionCommand c;
1663         c.deletePeer(peer_id);
1664         putCommand(c);
1665 }
1666
1667 void Connection::PrintInfo(std::ostream &out)
1668 {
1669         out<<getDesc()<<": ";
1670 }
1671
1672 void Connection::PrintInfo()
1673 {
1674         PrintInfo(dout_con);
1675 }
1676
1677 std::string Connection::getDesc()
1678 {
1679         return std::string("con(")+itos(m_socket.GetHandle())+"/"+itos(m_peer_id)+")";
1680 }
1681
1682 } // namespace
1683