Improve Connection with threading and some kind of congestion control
[oweals/minetest.git] / src / connection.cpp
1 /*
2 Minetest-c55
3 Copyright (C) 2010 celeron55, Perttu Ahola <celeron55@gmail.com>
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License along
16 with this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20 #include "connection.h"
21 #include "main.h"
22 #include "serialization.h"
23 #include "log.h"
24 #include "porting.h"
25
26 namespace con
27 {
28
29 BufferedPacket makePacket(Address &address, u8 *data, u32 datasize,
30                 u32 protocol_id, u16 sender_peer_id, u8 channel)
31 {
32         u32 packet_size = datasize + BASE_HEADER_SIZE;
33         BufferedPacket p(packet_size);
34         p.address = address;
35
36         writeU32(&p.data[0], protocol_id);
37         writeU16(&p.data[4], sender_peer_id);
38         writeU8(&p.data[6], channel);
39
40         memcpy(&p.data[BASE_HEADER_SIZE], data, datasize);
41
42         return p;
43 }
44
45 BufferedPacket makePacket(Address &address, SharedBuffer<u8> &data,
46                 u32 protocol_id, u16 sender_peer_id, u8 channel)
47 {
48         return makePacket(address, *data, data.getSize(),
49                         protocol_id, sender_peer_id, channel);
50 }
51
52 SharedBuffer<u8> makeOriginalPacket(
53                 SharedBuffer<u8> data)
54 {
55         u32 header_size = 1;
56         u32 packet_size = data.getSize() + header_size;
57         SharedBuffer<u8> b(packet_size);
58
59         writeU8(&b[0], TYPE_ORIGINAL);
60
61         memcpy(&b[header_size], *data, data.getSize());
62
63         return b;
64 }
65
66 core::list<SharedBuffer<u8> > makeSplitPacket(
67                 SharedBuffer<u8> data,
68                 u32 chunksize_max,
69                 u16 seqnum)
70 {
71         // Chunk packets, containing the TYPE_SPLIT header
72         core::list<SharedBuffer<u8> > chunks;
73         
74         u32 chunk_header_size = 7;
75         u32 maximum_data_size = chunksize_max - chunk_header_size;
76         u32 start = 0;
77         u32 end = 0;
78         u32 chunk_num = 0;
79         do{
80                 end = start + maximum_data_size - 1;
81                 if(end > data.getSize() - 1)
82                         end = data.getSize() - 1;
83                 
84                 u32 payload_size = end - start + 1;
85                 u32 packet_size = chunk_header_size + payload_size;
86
87                 SharedBuffer<u8> chunk(packet_size);
88                 
89                 writeU8(&chunk[0], TYPE_SPLIT);
90                 writeU16(&chunk[1], seqnum);
91                 // [3] u16 chunk_count is written at next stage
92                 writeU16(&chunk[5], chunk_num);
93                 memcpy(&chunk[chunk_header_size], &data[start], payload_size);
94
95                 chunks.push_back(chunk);
96                 
97                 start = end + 1;
98                 chunk_num++;
99         }
100         while(end != data.getSize() - 1);
101
102         u16 chunk_count = chunks.getSize();
103
104         core::list<SharedBuffer<u8> >::Iterator i = chunks.begin();
105         for(; i != chunks.end(); i++)
106         {
107                 // Write chunk_count
108                 writeU16(&((*i)[3]), chunk_count);
109         }
110
111         return chunks;
112 }
113
114 core::list<SharedBuffer<u8> > makeAutoSplitPacket(
115                 SharedBuffer<u8> data,
116                 u32 chunksize_max,
117                 u16 &split_seqnum)
118 {
119         u32 original_header_size = 1;
120         core::list<SharedBuffer<u8> > list;
121         if(data.getSize() + original_header_size > chunksize_max)
122         {
123                 list = makeSplitPacket(data, chunksize_max, split_seqnum);
124                 split_seqnum++;
125                 return list;
126         }
127         else
128         {
129                 list.push_back(makeOriginalPacket(data));
130         }
131         return list;
132 }
133
134 SharedBuffer<u8> makeReliablePacket(
135                 SharedBuffer<u8> data,
136                 u16 seqnum)
137 {
138         /*dstream<<"BEGIN SharedBuffer<u8> makeReliablePacket()"<<std::endl;
139         dstream<<"data.getSize()="<<data.getSize()<<", data[0]="
140                         <<((unsigned int)data[0]&0xff)<<std::endl;*/
141         u32 header_size = 3;
142         u32 packet_size = data.getSize() + header_size;
143         SharedBuffer<u8> b(packet_size);
144
145         writeU8(&b[0], TYPE_RELIABLE);
146         writeU16(&b[1], seqnum);
147
148         memcpy(&b[header_size], *data, data.getSize());
149
150         /*dstream<<"data.getSize()="<<data.getSize()<<", data[0]="
151                         <<((unsigned int)data[0]&0xff)<<std::endl;*/
152         //dstream<<"END SharedBuffer<u8> makeReliablePacket()"<<std::endl;
153         return b;
154 }
155
156 /*
157         ReliablePacketBuffer
158 */
159
160 void ReliablePacketBuffer::print()
161 {
162         core::list<BufferedPacket>::Iterator i;
163         i = m_list.begin();
164         for(; i != m_list.end(); i++)
165         {
166                 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
167                 dout_con<<s<<" ";
168         }
169 }
170 bool ReliablePacketBuffer::empty()
171 {
172         return m_list.empty();
173 }
174 u32 ReliablePacketBuffer::size()
175 {
176         return m_list.getSize();
177 }
178 RPBSearchResult ReliablePacketBuffer::findPacket(u16 seqnum)
179 {
180         core::list<BufferedPacket>::Iterator i;
181         i = m_list.begin();
182         for(; i != m_list.end(); i++)
183         {
184                 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
185                 /*dout_con<<"findPacket(): finding seqnum="<<seqnum
186                                 <<", comparing to s="<<s<<std::endl;*/
187                 if(s == seqnum)
188                         break;
189         }
190         return i;
191 }
192 RPBSearchResult ReliablePacketBuffer::notFound()
193 {
194         return m_list.end();
195 }
196 u16 ReliablePacketBuffer::getFirstSeqnum()
197 {
198         if(empty())
199                 throw NotFoundException("Buffer is empty");
200         BufferedPacket p = *m_list.begin();
201         return readU16(&p.data[BASE_HEADER_SIZE+1]);
202 }
203 BufferedPacket ReliablePacketBuffer::popFirst()
204 {
205         if(empty())
206                 throw NotFoundException("Buffer is empty");
207         BufferedPacket p = *m_list.begin();
208         core::list<BufferedPacket>::Iterator i = m_list.begin();
209         m_list.erase(i);
210         return p;
211 }
212 BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum)
213 {
214         RPBSearchResult r = findPacket(seqnum);
215         if(r == notFound()){
216                 dout_con<<"Not found"<<std::endl;
217                 throw NotFoundException("seqnum not found in buffer");
218         }
219         BufferedPacket p = *r;
220         m_list.erase(r);
221         return p;
222 }
223 void ReliablePacketBuffer::insert(BufferedPacket &p)
224 {
225         assert(p.data.getSize() >= BASE_HEADER_SIZE+3);
226         u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
227         assert(type == TYPE_RELIABLE);
228         u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
229
230         // Find the right place for the packet and insert it there
231
232         // If list is empty, just add it
233         if(m_list.empty())
234         {
235                 m_list.push_back(p);
236                 // Done.
237                 return;
238         }
239         // Otherwise find the right place
240         core::list<BufferedPacket>::Iterator i;
241         i = m_list.begin();
242         // Find the first packet in the list which has a higher seqnum
243         for(; i != m_list.end(); i++){
244                 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
245                 if(s == seqnum){
246                         throw AlreadyExistsException("Same seqnum in list");
247                 }
248                 if(seqnum_higher(s, seqnum)){
249                         break;
250                 }
251         }
252         // If we're at the end of the list, add the packet to the
253         // end of the list
254         if(i == m_list.end())
255         {
256                 m_list.push_back(p);
257                 // Done.
258                 return;
259         }
260         // Insert before i
261         m_list.insert_before(i, p);
262 }
263
264 void ReliablePacketBuffer::incrementTimeouts(float dtime)
265 {
266         core::list<BufferedPacket>::Iterator i;
267         i = m_list.begin();
268         for(; i != m_list.end(); i++){
269                 i->time += dtime;
270                 i->totaltime += dtime;
271         }
272 }
273
274 void ReliablePacketBuffer::resetTimedOuts(float timeout)
275 {
276         core::list<BufferedPacket>::Iterator i;
277         i = m_list.begin();
278         for(; i != m_list.end(); i++){
279                 if(i->time >= timeout)
280                         i->time = 0.0;
281         }
282 }
283
284 bool ReliablePacketBuffer::anyTotaltimeReached(float timeout)
285 {
286         core::list<BufferedPacket>::Iterator i;
287         i = m_list.begin();
288         for(; i != m_list.end(); i++){
289                 if(i->totaltime >= timeout)
290                         return true;
291         }
292         return false;
293 }
294
295 core::list<BufferedPacket> ReliablePacketBuffer::getTimedOuts(float timeout)
296 {
297         core::list<BufferedPacket> timed_outs;
298         core::list<BufferedPacket>::Iterator i;
299         i = m_list.begin();
300         for(; i != m_list.end(); i++)
301         {
302                 if(i->time >= timeout)
303                         timed_outs.push_back(*i);
304         }
305         return timed_outs;
306 }
307
308 /*
309         IncomingSplitBuffer
310 */
311
312 IncomingSplitBuffer::~IncomingSplitBuffer()
313 {
314         core::map<u16, IncomingSplitPacket*>::Iterator i;
315         i = m_buf.getIterator();
316         for(; i.atEnd() == false; i++)
317         {
318                 delete i.getNode()->getValue();
319         }
320 }
321 /*
322         This will throw a GotSplitPacketException when a full
323         split packet is constructed.
324 */
325 SharedBuffer<u8> IncomingSplitBuffer::insert(BufferedPacket &p, bool reliable)
326 {
327         u32 headersize = BASE_HEADER_SIZE + 7;
328         assert(p.data.getSize() >= headersize);
329         u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
330         assert(type == TYPE_SPLIT);
331         u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
332         u16 chunk_count = readU16(&p.data[BASE_HEADER_SIZE+3]);
333         u16 chunk_num = readU16(&p.data[BASE_HEADER_SIZE+5]);
334
335         // Add if doesn't exist
336         if(m_buf.find(seqnum) == NULL)
337         {
338                 IncomingSplitPacket *sp = new IncomingSplitPacket();
339                 sp->chunk_count = chunk_count;
340                 sp->reliable = reliable;
341                 m_buf[seqnum] = sp;
342         }
343         
344         IncomingSplitPacket *sp = m_buf[seqnum];
345         
346         // TODO: These errors should be thrown or something? Dunno.
347         if(chunk_count != sp->chunk_count)
348                 derr_con<<"Connection: WARNING: chunk_count="<<chunk_count
349                                 <<" != sp->chunk_count="<<sp->chunk_count
350                                 <<std::endl;
351         if(reliable != sp->reliable)
352                 derr_con<<"Connection: WARNING: reliable="<<reliable
353                                 <<" != sp->reliable="<<sp->reliable
354                                 <<std::endl;
355
356         // If chunk already exists, cancel
357         if(sp->chunks.find(chunk_num) != NULL)
358                 throw AlreadyExistsException("Chunk already in buffer");
359         
360         // Cut chunk data out of packet
361         u32 chunkdatasize = p.data.getSize() - headersize;
362         SharedBuffer<u8> chunkdata(chunkdatasize);
363         memcpy(*chunkdata, &(p.data[headersize]), chunkdatasize);
364         
365         // Set chunk data in buffer
366         sp->chunks[chunk_num] = chunkdata;
367         
368         // If not all chunks are received, return empty buffer
369         if(sp->allReceived() == false)
370                 return SharedBuffer<u8>();
371
372         // Calculate total size
373         u32 totalsize = 0;
374         core::map<u16, SharedBuffer<u8> >::Iterator i;
375         i = sp->chunks.getIterator();
376         for(; i.atEnd() == false; i++)
377         {
378                 totalsize += i.getNode()->getValue().getSize();
379         }
380         
381         SharedBuffer<u8> fulldata(totalsize);
382
383         // Copy chunks to data buffer
384         u32 start = 0;
385         for(u32 chunk_i=0; chunk_i<sp->chunk_count;
386                         chunk_i++)
387         {
388                 SharedBuffer<u8> buf = sp->chunks[chunk_i];
389                 u16 chunkdatasize = buf.getSize();
390                 memcpy(&fulldata[start], *buf, chunkdatasize);
391                 start += chunkdatasize;;
392         }
393
394         // Remove sp from buffer
395         m_buf.remove(seqnum);
396         delete sp;
397
398         return fulldata;
399 }
400 void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout)
401 {
402         core::list<u16> remove_queue;
403         core::map<u16, IncomingSplitPacket*>::Iterator i;
404         i = m_buf.getIterator();
405         for(; i.atEnd() == false; i++)
406         {
407                 IncomingSplitPacket *p = i.getNode()->getValue();
408                 // Reliable ones are not removed by timeout
409                 if(p->reliable == true)
410                         continue;
411                 p->time += dtime;
412                 if(p->time >= timeout)
413                         remove_queue.push_back(i.getNode()->getKey());
414         }
415         core::list<u16>::Iterator j;
416         j = remove_queue.begin();
417         for(; j != remove_queue.end(); j++)
418         {
419                 dout_con<<"NOTE: Removing timed out unreliable split packet"
420                                 <<std::endl;
421                 delete m_buf[*j];
422                 m_buf.remove(*j);
423         }
424 }
425
426 /*
427         Channel
428 */
429
430 Channel::Channel()
431 {
432         next_outgoing_seqnum = SEQNUM_INITIAL;
433         next_incoming_seqnum = SEQNUM_INITIAL;
434         next_outgoing_split_seqnum = SEQNUM_INITIAL;
435 }
436 Channel::~Channel()
437 {
438 }
439
440 /*
441         Peer
442 */
443
444 Peer::Peer(u16 a_id, Address a_address):
445         address(a_address),
446         id(a_id),
447         timeout_counter(0.0),
448         ping_timer(0.0),
449         resend_timeout(0.5),
450         avg_rtt(-1.0),
451         has_sent_with_id(false),
452         m_sendtime_accu(0),
453         m_max_packets_per_second(10),
454         m_num_sent(0),
455         m_max_num_sent(0)
456 {
457 }
458 Peer::~Peer()
459 {
460 }
461
462 void Peer::reportRTT(float rtt)
463 {
464         if(rtt >= 0.0){
465                 if(rtt < 0.01){
466                         if(m_max_packets_per_second < 100)
467                                 m_max_packets_per_second += 10;
468                 } else if(rtt < 0.2){
469                         if(m_max_packets_per_second < 100)
470                                 m_max_packets_per_second += 2;
471                 } else {
472                         if(m_max_packets_per_second > 5)
473                                 m_max_packets_per_second *= 0.5;
474                 }
475         }
476
477         if(rtt < -0.999)
478         {}
479         else if(avg_rtt < 0.0)
480                 avg_rtt = rtt;
481         else
482                 avg_rtt = rtt * 0.1 + avg_rtt * 0.9;
483         
484         // Calculate resend_timeout
485
486         /*int reliable_count = 0;
487         for(int i=0; i<CHANNEL_COUNT; i++)
488         {
489                 reliable_count += channels[i].outgoing_reliables.size();
490         }
491         float timeout = avg_rtt * RESEND_TIMEOUT_FACTOR
492                         * ((float)reliable_count * 1);*/
493         
494         float timeout = avg_rtt * RESEND_TIMEOUT_FACTOR;
495         if(timeout < RESEND_TIMEOUT_MIN)
496                 timeout = RESEND_TIMEOUT_MIN;
497         if(timeout > RESEND_TIMEOUT_MAX)
498                 timeout = RESEND_TIMEOUT_MAX;
499         resend_timeout = timeout;
500 }
501                                 
502 /*
503         Connection
504 */
505
506 Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout):
507         m_protocol_id(protocol_id),
508         m_max_packet_size(max_packet_size),
509         m_timeout(timeout),
510         m_peer_id(0),
511         m_bc_peerhandler(NULL),
512         m_bc_receive_timeout(0),
513         m_indentation(0)
514 {
515         m_socket.setTimeoutMs(5);
516
517         Start();
518 }
519
520 Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout,
521                 PeerHandler *peerhandler):
522         m_protocol_id(protocol_id),
523         m_max_packet_size(max_packet_size),
524         m_timeout(timeout),
525         m_peer_id(0),
526         m_bc_peerhandler(peerhandler),
527         m_bc_receive_timeout(0),
528         m_indentation(0)
529 {
530         m_socket.setTimeoutMs(5);
531
532         Start();
533 }
534
535
536 Connection::~Connection()
537 {
538         stop();
539 }
540
541 /* Internal stuff */
542
543 void * Connection::Thread()
544 {
545         ThreadStarted();
546         log_register_thread("Connection");
547
548         dout_con<<"Connection thread started"<<std::endl;
549         
550         u32 curtime = porting::getTimeMs();
551         u32 lasttime = curtime;
552
553         while(getRun())
554         {
555                 BEGIN_DEBUG_EXCEPTION_HANDLER
556                 
557                 lasttime = curtime;
558                 curtime = porting::getTimeMs();
559                 float dtime = (float)(curtime - lasttime) / 1000.;
560                 if(dtime > 0.1)
561                         dtime = 0.1;
562                 if(dtime < 0.0)
563                         dtime = 0.0;
564                 
565                 runTimeouts(dtime);
566
567                 while(m_command_queue.size() != 0){
568                         ConnectionCommand c = m_command_queue.pop_front();
569                         processCommand(c);
570                 }
571
572                 send(dtime);
573
574                 receive();
575                 
576                 END_DEBUG_EXCEPTION_HANDLER(derr_con);
577         }
578
579         return NULL;
580 }
581
582 void Connection::putEvent(ConnectionEvent &e)
583 {
584         assert(e.type != CONNEVENT_NONE);
585         m_event_queue.push_back(e);
586 }
587
588 void Connection::processCommand(ConnectionCommand &c)
589 {
590         switch(c.type){
591         case CONNCMD_NONE:
592                 dout_con<<getDesc()<<" processing CONNCMD_NONE"<<std::endl;
593                 return;
594         case CONNCMD_SERVE:
595                 dout_con<<getDesc()<<" processing CONNCMD_SERVE port="
596                                 <<c.port<<std::endl;
597                 serve(c.port);
598                 return;
599         case CONNCMD_CONNECT:
600                 dout_con<<getDesc()<<" processing CONNCMD_CONNECT"<<std::endl;
601                 connect(c.address);
602                 return;
603         case CONNCMD_DISCONNECT:
604                 dout_con<<getDesc()<<" processing CONNCMD_DISCONNECT"<<std::endl;
605                 disconnect();
606                 return;
607         case CONNCMD_SEND:
608                 dout_con<<getDesc()<<" processing CONNCMD_SEND"<<std::endl;
609                 send(c.peer_id, c.channelnum, c.data, c.reliable);
610                 return;
611         case CONNCMD_SEND_TO_ALL:
612                 dout_con<<getDesc()<<" processing CONNCMD_SEND_TO_ALL"<<std::endl;
613                 sendToAll(c.channelnum, c.data, c.reliable);
614                 return;
615         case CONNCMD_DELETE_PEER:
616                 dout_con<<getDesc()<<" processing CONNCMD_DELETE_PEER"<<std::endl;
617                 deletePeer(c.peer_id, false);
618                 return;
619         }
620 }
621
622 void Connection::send(float dtime)
623 {
624         for(core::map<u16, Peer*>::Iterator
625                         j = m_peers.getIterator();
626                         j.atEnd() == false; j++)
627         {
628                 Peer *peer = j.getNode()->getValue();
629                 peer->m_sendtime_accu += dtime;
630                 peer->m_num_sent = 0;
631                 peer->m_max_num_sent = peer->m_sendtime_accu *
632                                 peer->m_max_packets_per_second;
633         }
634         Queue<OutgoingPacket> postponed_packets;
635         while(m_outgoing_queue.size() != 0){
636                 OutgoingPacket packet = m_outgoing_queue.pop_front();
637                 Peer *peer = getPeerNoEx(packet.peer_id);
638                 if(!peer)
639                         continue;
640                 if(peer->channels[packet.channelnum].outgoing_reliables.size() >= 5){
641                         postponed_packets.push_back(packet);
642                 } else if(peer->m_num_sent < peer->m_max_num_sent){
643                         rawSendAsPacket(packet.peer_id, packet.channelnum,
644                                         packet.data, packet.reliable);
645                         peer->m_num_sent++;
646                 } else {
647                         postponed_packets.push_back(packet);
648                 }
649         }
650         while(postponed_packets.size() != 0){
651                 m_outgoing_queue.push_back(postponed_packets.pop_front());
652         }
653         for(core::map<u16, Peer*>::Iterator
654                         j = m_peers.getIterator();
655                         j.atEnd() == false; j++)
656         {
657                 Peer *peer = j.getNode()->getValue();
658                 peer->m_sendtime_accu -= (float)peer->m_num_sent /
659                                 peer->m_max_packets_per_second;
660                 if(peer->m_sendtime_accu > 10. / peer->m_max_packets_per_second)
661                         peer->m_sendtime_accu = 10. / peer->m_max_packets_per_second;
662         }
663 }
664
665 // Receive packets from the network and buffers and create ConnectionEvents
666 void Connection::receive()
667 {
668         u32 datasize = 100000;
669         // TODO: We can not know how many layers of header there are.
670         // For now, just assume there are no other than the base headers.
671         u32 packet_maxsize = datasize + BASE_HEADER_SIZE;
672         Buffer<u8> packetdata(packet_maxsize);
673
674         bool single_wait_done = false;
675         
676         for(;;)
677         {
678         try{
679                 /* Check if some buffer has relevant data */
680                 {
681                         u16 peer_id;
682                         SharedBuffer<u8> resultdata;
683                         bool got = getFromBuffers(peer_id, resultdata);
684                         if(got){
685                                 ConnectionEvent e;
686                                 e.dataReceived(peer_id, resultdata);
687                                 putEvent(e);
688                                 continue;
689                         }
690                 }
691                 
692                 if(single_wait_done){
693                         if(m_socket.WaitData(0) == false)
694                                 break;
695                 }
696                 
697                 single_wait_done = true;
698
699                 Address sender;
700                 s32 received_size = m_socket.Receive(sender, *packetdata, packet_maxsize);
701
702                 if(received_size < 0)
703                         break;
704                 if(received_size < BASE_HEADER_SIZE)
705                         continue;
706                 if(readU32(&packetdata[0]) != m_protocol_id)
707                         continue;
708                 
709                 u16 peer_id = readPeerId(*packetdata);
710                 u8 channelnum = readChannel(*packetdata);
711                 if(channelnum > CHANNEL_COUNT-1){
712                         PrintInfo(derr_con);
713                         derr_con<<"Receive(): Invalid channel "<<channelnum<<std::endl;
714                         throw InvalidIncomingDataException("Channel doesn't exist");
715                 }
716
717                 if(peer_id == PEER_ID_INEXISTENT)
718                 {
719                         /*
720                                 Somebody is trying to send stuff to us with no peer id.
721                                 
722                                 Check if the same address and port was added to our peer
723                                 list before.
724                                 Allow only entries that have has_sent_with_id==false.
725                         */
726
727                         core::map<u16, Peer*>::Iterator j;
728                         j = m_peers.getIterator();
729                         for(; j.atEnd() == false; j++)
730                         {
731                                 Peer *peer = j.getNode()->getValue();
732                                 if(peer->has_sent_with_id)
733                                         continue;
734                                 if(peer->address == sender)
735                                         break;
736                         }
737                         
738                         /*
739                                 If no peer was found with the same address and port,
740                                 we shall assume it is a new peer and create an entry.
741                         */
742                         if(j.atEnd())
743                         {
744                                 // Pass on to adding the peer
745                         }
746                         // Else: A peer was found.
747                         else
748                         {
749                                 Peer *peer = j.getNode()->getValue();
750                                 peer_id = peer->id;
751                                 PrintInfo(derr_con);
752                                 derr_con<<"WARNING: Assuming unknown peer to be "
753                                                 <<"peer_id="<<peer_id<<std::endl;
754                         }
755                 }
756                 
757                 /*
758                         The peer was not found in our lists. Add it.
759                 */
760                 if(peer_id == PEER_ID_INEXISTENT)
761                 {
762                         // Somebody wants to make a new connection
763
764                         // Get a unique peer id (2 or higher)
765                         u16 peer_id_new = 2;
766                         /*
767                                 Find an unused peer id
768                         */
769                         bool out_of_ids = false;
770                         for(;;)
771                         {
772                                 // Check if exists
773                                 if(m_peers.find(peer_id_new) == NULL)
774                                         break;
775                                 // Check for overflow
776                                 if(peer_id_new == 65535){
777                                         out_of_ids = true;
778                                         break;
779                                 }
780                                 peer_id_new++;
781                         }
782                         if(out_of_ids){
783                                 errorstream<<getDesc()<<" ran out of peer ids"<<std::endl;
784                                 continue;
785                         }
786
787                         PrintInfo();
788                         dout_con<<"Receive(): Got a packet with peer_id=PEER_ID_INEXISTENT,"
789                                         " giving peer_id="<<peer_id_new<<std::endl;
790
791                         // Create a peer
792                         Peer *peer = new Peer(peer_id_new, sender);
793                         m_peers.insert(peer->id, peer);
794                         
795                         // Create peer addition event
796                         ConnectionEvent e;
797                         e.peerAdded(peer_id_new, sender);
798                         putEvent(e);
799                         
800                         // Create CONTROL packet to tell the peer id to the new peer.
801                         SharedBuffer<u8> reply(4);
802                         writeU8(&reply[0], TYPE_CONTROL);
803                         writeU8(&reply[1], CONTROLTYPE_SET_PEER_ID);
804                         writeU16(&reply[2], peer_id_new);
805                         sendAsPacket(peer_id_new, 0, reply, true);
806                         
807                         // We're now talking to a valid peer_id
808                         peer_id = peer_id_new;
809
810                         // Go on and process whatever it sent
811                 }
812
813                 core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
814
815                 if(node == NULL)
816                 {
817                         // Peer not found
818                         // This means that the peer id of the sender is not PEER_ID_INEXISTENT
819                         // and it is invalid.
820                         PrintInfo(derr_con);
821                         derr_con<<"Receive(): Peer not found"<<std::endl;
822                         throw InvalidIncomingDataException("Peer not found (possible timeout)");
823                 }
824
825                 Peer *peer = node->getValue();
826
827                 // Validate peer address
828                 if(peer->address != sender)
829                 {
830                         PrintInfo(derr_con);
831                         derr_con<<"Peer "<<peer_id<<" sending from different address."
832                                         " Ignoring."<<std::endl;
833                         continue;
834                 }
835                 
836                 peer->timeout_counter = 0.0;
837
838                 Channel *channel = &(peer->channels[channelnum]);
839                 
840                 // Throw the received packet to channel->processPacket()
841
842                 // Make a new SharedBuffer from the data without the base headers
843                 SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE);
844                 memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE],
845                                 strippeddata.getSize());
846                 
847                 try{
848                         // Process it (the result is some data with no headers made by us)
849                         SharedBuffer<u8> resultdata = processPacket
850                                         (channel, strippeddata, peer_id, channelnum, false);
851                         
852                         PrintInfo();
853                         dout_con<<"ProcessPacket returned data of size "
854                                         <<resultdata.getSize()<<std::endl;
855                         
856                         if(datasize < resultdata.getSize())
857                                 throw InvalidIncomingDataException
858                                                 ("Buffer too small for received data");
859                         
860                         ConnectionEvent e;
861                         e.dataReceived(peer_id, resultdata);
862                         putEvent(e);
863                         continue;
864                 }catch(ProcessedSilentlyException &e){
865                 }
866         }catch(InvalidIncomingDataException &e){
867         }
868         catch(ProcessedSilentlyException &e){
869         }
870         } // for
871 }
872
873 void Connection::runTimeouts(float dtime)
874 {
875         core::list<u16> timeouted_peers;
876         core::map<u16, Peer*>::Iterator j;
877         j = m_peers.getIterator();
878         for(; j.atEnd() == false; j++)
879         {
880                 Peer *peer = j.getNode()->getValue();
881                 
882                 /*
883                         Check peer timeout
884                 */
885                 peer->timeout_counter += dtime;
886                 if(peer->timeout_counter > m_timeout)
887                 {
888                         PrintInfo(derr_con);
889                         derr_con<<"RunTimeouts(): Peer "<<peer->id
890                                         <<" has timed out."
891                                         <<" (source=peer->timeout_counter)"
892                                         <<std::endl;
893                         // Add peer to the list
894                         timeouted_peers.push_back(peer->id);
895                         // Don't bother going through the buffers of this one
896                         continue;
897                 }
898
899                 float resend_timeout = peer->resend_timeout;
900                 for(u16 i=0; i<CHANNEL_COUNT; i++)
901                 {
902                         core::list<BufferedPacket> timed_outs;
903                         core::list<BufferedPacket>::Iterator j;
904                         
905                         Channel *channel = &peer->channels[i];
906
907                         // Remove timed out incomplete unreliable split packets
908                         channel->incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout);
909                         
910                         // Increment reliable packet times
911                         channel->outgoing_reliables.incrementTimeouts(dtime);
912
913                         // Check reliable packet total times, remove peer if
914                         // over timeout.
915                         if(channel->outgoing_reliables.anyTotaltimeReached(m_timeout))
916                         {
917                                 PrintInfo(derr_con);
918                                 derr_con<<"RunTimeouts(): Peer "<<peer->id
919                                                 <<" has timed out."
920                                                 <<" (source=reliable packet totaltime)"
921                                                 <<std::endl;
922                                 // Add peer to the to-be-removed list
923                                 timeouted_peers.push_back(peer->id);
924                                 goto nextpeer;
925                         }
926
927                         // Re-send timed out outgoing reliables
928                         
929                         timed_outs = channel->
930                                         outgoing_reliables.getTimedOuts(resend_timeout);
931
932                         channel->outgoing_reliables.resetTimedOuts(resend_timeout);
933
934                         j = timed_outs.begin();
935                         for(; j != timed_outs.end(); j++)
936                         {
937                                 u16 peer_id = readPeerId(*(j->data));
938                                 u8 channel = readChannel(*(j->data));
939                                 u16 seqnum = readU16(&(j->data[BASE_HEADER_SIZE+1]));
940
941                                 PrintInfo(derr_con);
942                                 derr_con<<"RE-SENDING timed-out RELIABLE to ";
943                                 j->address.print(&derr_con);
944                                 derr_con<<"(t/o="<<resend_timeout<<"): "
945                                                 <<"from_peer_id="<<peer_id
946                                                 <<", channel="<<((int)channel&0xff)
947                                                 <<", seqnum="<<seqnum
948                                                 <<std::endl;
949
950                                 rawSend(*j);
951
952                                 // Enlarge avg_rtt and resend_timeout:
953                                 // The rtt will be at least the timeout.
954                                 // NOTE: This won't affect the timeout of the next
955                                 // checked channel because it was cached.
956                                 peer->reportRTT(resend_timeout);
957                         }
958                 }
959                 
960                 /*
961                         Send pings
962                 */
963                 peer->ping_timer += dtime;
964                 if(peer->ping_timer >= 5.0)
965                 {
966                         // Create and send PING packet
967                         SharedBuffer<u8> data(2);
968                         writeU8(&data[0], TYPE_CONTROL);
969                         writeU8(&data[1], CONTROLTYPE_PING);
970                         rawSendAsPacket(peer->id, 0, data, true);
971
972                         peer->ping_timer = 0.0;
973                 }
974                 
975 nextpeer:
976                 continue;
977         }
978
979         // Remove timed out peers
980         core::list<u16>::Iterator i = timeouted_peers.begin();
981         for(; i != timeouted_peers.end(); i++)
982         {
983                 PrintInfo(derr_con);
984                 derr_con<<"RunTimeouts(): Removing peer "<<(*i)<<std::endl;
985                 deletePeer(*i, true);
986         }
987 }
988
989 void Connection::serve(u16 port)
990 {
991         dout_con<<getDesc()<<" serving at port "<<port<<std::endl;
992         m_socket.Bind(port);
993         m_peer_id = PEER_ID_SERVER;
994 }
995
996 void Connection::connect(Address address)
997 {
998         dout_con<<getDesc()<<" connecting to "<<address.serializeString()
999                         <<":"<<address.getPort()<<std::endl;
1000
1001         core::map<u16, Peer*>::Node *node = m_peers.find(PEER_ID_SERVER);
1002         if(node != NULL){
1003                 throw ConnectionException("Already connected to a server");
1004         }
1005
1006         Peer *peer = new Peer(PEER_ID_SERVER, address);
1007         m_peers.insert(peer->id, peer);
1008
1009         // Create event
1010         ConnectionEvent e;
1011         e.peerAdded(peer->id, peer->address);
1012         putEvent(e);
1013         
1014         m_socket.Bind(0);
1015         
1016         // Send a dummy packet to server with peer_id = PEER_ID_INEXISTENT
1017         m_peer_id = PEER_ID_INEXISTENT;
1018         SharedBuffer<u8> data(0);
1019         Send(PEER_ID_SERVER, 0, data, true);
1020 }
1021
1022 void Connection::disconnect()
1023 {
1024         dout_con<<getDesc()<<" disconnecting"<<std::endl;
1025
1026         // Create and send DISCO packet
1027         SharedBuffer<u8> data(2);
1028         writeU8(&data[0], TYPE_CONTROL);
1029         writeU8(&data[1], CONTROLTYPE_DISCO);
1030         
1031         // Send to all
1032         core::map<u16, Peer*>::Iterator j;
1033         j = m_peers.getIterator();
1034         for(; j.atEnd() == false; j++)
1035         {
1036                 Peer *peer = j.getNode()->getValue();
1037                 rawSendAsPacket(peer->id, 0, data, false);
1038         }
1039 }
1040
1041 void Connection::sendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable)
1042 {
1043         core::map<u16, Peer*>::Iterator j;
1044         j = m_peers.getIterator();
1045         for(; j.atEnd() == false; j++)
1046         {
1047                 Peer *peer = j.getNode()->getValue();
1048                 send(peer->id, channelnum, data, reliable);
1049         }
1050 }
1051
1052 void Connection::send(u16 peer_id, u8 channelnum,
1053                 SharedBuffer<u8> data, bool reliable)
1054 {
1055         dout_con<<getDesc()<<" sending to peer_id="<<peer_id<<std::endl;
1056
1057         assert(channelnum < CHANNEL_COUNT);
1058         
1059         Peer *peer = getPeerNoEx(peer_id);
1060         if(peer == NULL)
1061                 return;
1062         Channel *channel = &(peer->channels[channelnum]);
1063
1064         u32 chunksize_max = m_max_packet_size - BASE_HEADER_SIZE;
1065         if(reliable)
1066                 chunksize_max -= RELIABLE_HEADER_SIZE;
1067
1068         core::list<SharedBuffer<u8> > originals;
1069         originals = makeAutoSplitPacket(data, chunksize_max,
1070                         channel->next_outgoing_split_seqnum);
1071         
1072         core::list<SharedBuffer<u8> >::Iterator i;
1073         i = originals.begin();
1074         for(; i != originals.end(); i++)
1075         {
1076                 SharedBuffer<u8> original = *i;
1077                 
1078                 sendAsPacket(peer_id, channelnum, original, reliable);
1079         }
1080 }
1081
1082 void Connection::sendAsPacket(u16 peer_id, u8 channelnum,
1083                 SharedBuffer<u8> data, bool reliable)
1084 {
1085         OutgoingPacket packet(peer_id, channelnum, data, reliable);
1086         m_outgoing_queue.push_back(packet);
1087 }
1088
1089 void Connection::rawSendAsPacket(u16 peer_id, u8 channelnum,
1090                 SharedBuffer<u8> data, bool reliable)
1091 {
1092         Peer *peer = getPeerNoEx(peer_id);
1093         if(!peer)
1094                 return;
1095         Channel *channel = &(peer->channels[channelnum]);
1096
1097         if(reliable)
1098         {
1099                 u16 seqnum = channel->next_outgoing_seqnum;
1100                 channel->next_outgoing_seqnum++;
1101
1102                 SharedBuffer<u8> reliable = makeReliablePacket(data, seqnum);
1103
1104                 // Add base headers and make a packet
1105                 BufferedPacket p = makePacket(peer->address, reliable,
1106                                 m_protocol_id, m_peer_id, channelnum);
1107                 
1108                 try{
1109                         // Buffer the packet
1110                         channel->outgoing_reliables.insert(p);
1111                 }
1112                 catch(AlreadyExistsException &e)
1113                 {
1114                         PrintInfo(derr_con);
1115                         derr_con<<"WARNING: Going to send a reliable packet "
1116                                         "seqnum="<<seqnum<<" that is already "
1117                                         "in outgoing buffer"<<std::endl;
1118                         //assert(0);
1119                 }
1120                 
1121                 // Send the packet
1122                 rawSend(p);
1123         }
1124         else
1125         {
1126                 // Add base headers and make a packet
1127                 BufferedPacket p = makePacket(peer->address, data,
1128                                 m_protocol_id, m_peer_id, channelnum);
1129
1130                 // Send the packet
1131                 rawSend(p);
1132         }
1133 }
1134
1135 void Connection::rawSend(const BufferedPacket &packet)
1136 {
1137         try{
1138                 m_socket.Send(packet.address, *packet.data, packet.data.getSize());
1139         } catch(SendFailedException &e){
1140                 derr_con<<"Connection::rawSend(): SendFailedException: "
1141                                 <<packet.address.serializeString()<<std::endl;
1142         }
1143 }
1144
1145 Peer* Connection::getPeer(u16 peer_id)
1146 {
1147         core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
1148
1149         if(node == NULL){
1150                 throw PeerNotFoundException("GetPeer: Peer not found (possible timeout)");
1151         }
1152
1153         // Error checking
1154         assert(node->getValue()->id == peer_id);
1155
1156         return node->getValue();
1157 }
1158
1159 Peer* Connection::getPeerNoEx(u16 peer_id)
1160 {
1161         core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
1162
1163         if(node == NULL){
1164                 return NULL;
1165         }
1166
1167         // Error checking
1168         assert(node->getValue()->id == peer_id);
1169
1170         return node->getValue();
1171 }
1172
1173 core::list<Peer*> Connection::getPeers()
1174 {
1175         core::list<Peer*> list;
1176         core::map<u16, Peer*>::Iterator j;
1177         j = m_peers.getIterator();
1178         for(; j.atEnd() == false; j++)
1179         {
1180                 Peer *peer = j.getNode()->getValue();
1181                 list.push_back(peer);
1182         }
1183         return list;
1184 }
1185
1186 bool Connection::getFromBuffers(u16 &peer_id, SharedBuffer<u8> &dst)
1187 {
1188         core::map<u16, Peer*>::Iterator j;
1189         j = m_peers.getIterator();
1190         for(; j.atEnd() == false; j++)
1191         {
1192                 Peer *peer = j.getNode()->getValue();
1193                 for(u16 i=0; i<CHANNEL_COUNT; i++)
1194                 {
1195                         Channel *channel = &peer->channels[i];
1196                         SharedBuffer<u8> resultdata;
1197                         bool got = checkIncomingBuffers(channel, peer_id, resultdata);
1198                         if(got){
1199                                 dst = resultdata;
1200                                 return true;
1201                         }
1202                 }
1203         }
1204         return false;
1205 }
1206
1207 bool Connection::checkIncomingBuffers(Channel *channel, u16 &peer_id,
1208                 SharedBuffer<u8> &dst)
1209 {
1210         u16 firstseqnum = 0;
1211         // Clear old packets from start of buffer
1212         try{
1213         for(;;){
1214                 firstseqnum = channel->incoming_reliables.getFirstSeqnum();
1215                 if(seqnum_higher(channel->next_incoming_seqnum, firstseqnum))
1216                         channel->incoming_reliables.popFirst();
1217                 else
1218                         break;
1219         }
1220         // This happens if all packets are old
1221         }catch(con::NotFoundException)
1222         {}
1223         
1224         if(channel->incoming_reliables.empty() == false)
1225         {
1226                 if(firstseqnum == channel->next_incoming_seqnum)
1227                 {
1228                         BufferedPacket p = channel->incoming_reliables.popFirst();
1229                         
1230                         peer_id = readPeerId(*p.data);
1231                         u8 channelnum = readChannel(*p.data);
1232                         u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
1233
1234                         PrintInfo();
1235                         dout_con<<"UNBUFFERING TYPE_RELIABLE"
1236                                         <<" seqnum="<<seqnum
1237                                         <<" peer_id="<<peer_id
1238                                         <<" channel="<<((int)channelnum&0xff)
1239                                         <<std::endl;
1240
1241                         channel->next_incoming_seqnum++;
1242                         
1243                         u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
1244                         // Get out the inside packet and re-process it
1245                         SharedBuffer<u8> payload(p.data.getSize() - headers_size);
1246                         memcpy(*payload, &p.data[headers_size], payload.getSize());
1247
1248                         dst = processPacket(channel, payload, peer_id, channelnum, true);
1249                         return true;
1250                 }
1251         }
1252         return false;
1253 }
1254
1255 SharedBuffer<u8> Connection::processPacket(Channel *channel,
1256                 SharedBuffer<u8> packetdata, u16 peer_id,
1257                 u8 channelnum, bool reliable)
1258 {
1259         IndentationRaiser iraiser(&(m_indentation));
1260
1261         if(packetdata.getSize() < 1)
1262                 throw InvalidIncomingDataException("packetdata.getSize() < 1");
1263
1264         u8 type = readU8(&packetdata[0]);
1265         
1266         if(type == TYPE_CONTROL)
1267         {
1268                 if(packetdata.getSize() < 2)
1269                         throw InvalidIncomingDataException("packetdata.getSize() < 2");
1270
1271                 u8 controltype = readU8(&packetdata[1]);
1272
1273                 if(controltype == CONTROLTYPE_ACK)
1274                 {
1275                         if(packetdata.getSize() < 4)
1276                                 throw InvalidIncomingDataException
1277                                                 ("packetdata.getSize() < 4 (ACK header size)");
1278
1279                         u16 seqnum = readU16(&packetdata[2]);
1280                         PrintInfo();
1281                         dout_con<<"Got CONTROLTYPE_ACK: channelnum="
1282                                         <<((int)channelnum&0xff)<<", peer_id="<<peer_id
1283                                         <<", seqnum="<<seqnum<<std::endl;
1284
1285                         try{
1286                                 BufferedPacket p = channel->outgoing_reliables.popSeqnum(seqnum);
1287                                 // Get round trip time
1288                                 float rtt = p.totaltime;
1289
1290                                 // Let peer calculate stuff according to it
1291                                 // (avg_rtt and resend_timeout)
1292                                 Peer *peer = getPeer(peer_id);
1293                                 peer->reportRTT(rtt);
1294
1295                                 //PrintInfo(dout_con);
1296                                 //dout_con<<"RTT = "<<rtt<<std::endl;
1297
1298                                 /*dout_con<<"OUTGOING: ";
1299                                 PrintInfo();
1300                                 channel->outgoing_reliables.print();
1301                                 dout_con<<std::endl;*/
1302                         }
1303                         catch(NotFoundException &e){
1304                                 PrintInfo(derr_con);
1305                                 derr_con<<"WARNING: ACKed packet not "
1306                                                 "in outgoing queue"
1307                                                 <<std::endl;
1308                         }
1309
1310                         throw ProcessedSilentlyException("Got an ACK");
1311                 }
1312                 else if(controltype == CONTROLTYPE_SET_PEER_ID)
1313                 {
1314                         if(packetdata.getSize() < 4)
1315                                 throw InvalidIncomingDataException
1316                                                 ("packetdata.getSize() < 4 (SET_PEER_ID header size)");
1317                         u16 peer_id_new = readU16(&packetdata[2]);
1318                         PrintInfo();
1319                         dout_con<<"Got new peer id: "<<peer_id_new<<"... "<<std::endl;
1320
1321                         if(GetPeerID() != PEER_ID_INEXISTENT)
1322                         {
1323                                 PrintInfo(derr_con);
1324                                 derr_con<<"WARNING: Not changing"
1325                                                 " existing peer id."<<std::endl;
1326                         }
1327                         else
1328                         {
1329                                 dout_con<<"changing."<<std::endl;
1330                                 SetPeerID(peer_id_new);
1331                         }
1332                         throw ProcessedSilentlyException("Got a SET_PEER_ID");
1333                 }
1334                 else if(controltype == CONTROLTYPE_PING)
1335                 {
1336                         // Just ignore it, the incoming data already reset
1337                         // the timeout counter
1338                         PrintInfo();
1339                         dout_con<<"PING"<<std::endl;
1340                         throw ProcessedSilentlyException("Got a PING");
1341                 }
1342                 else if(controltype == CONTROLTYPE_DISCO)
1343                 {
1344                         // Just ignore it, the incoming data already reset
1345                         // the timeout counter
1346                         PrintInfo();
1347                         dout_con<<"DISCO: Removing peer "<<(peer_id)<<std::endl;
1348                         
1349                         if(deletePeer(peer_id, false) == false)
1350                         {
1351                                 PrintInfo(derr_con);
1352                                 derr_con<<"DISCO: Peer not found"<<std::endl;
1353                         }
1354
1355                         throw ProcessedSilentlyException("Got a DISCO");
1356                 }
1357                 else{
1358                         PrintInfo(derr_con);
1359                         derr_con<<"INVALID TYPE_CONTROL: invalid controltype="
1360                                         <<((int)controltype&0xff)<<std::endl;
1361                         throw InvalidIncomingDataException("Invalid control type");
1362                 }
1363         }
1364         else if(type == TYPE_ORIGINAL)
1365         {
1366                 if(packetdata.getSize() < ORIGINAL_HEADER_SIZE)
1367                         throw InvalidIncomingDataException
1368                                         ("packetdata.getSize() < ORIGINAL_HEADER_SIZE");
1369                 PrintInfo();
1370                 dout_con<<"RETURNING TYPE_ORIGINAL to user"
1371                                 <<std::endl;
1372                 // Get the inside packet out and return it
1373                 SharedBuffer<u8> payload(packetdata.getSize() - ORIGINAL_HEADER_SIZE);
1374                 memcpy(*payload, &packetdata[ORIGINAL_HEADER_SIZE], payload.getSize());
1375                 return payload;
1376         }
1377         else if(type == TYPE_SPLIT)
1378         {
1379                 // We have to create a packet again for buffering
1380                 // This isn't actually too bad an idea.
1381                 BufferedPacket packet = makePacket(
1382                                 getPeer(peer_id)->address,
1383                                 packetdata,
1384                                 GetProtocolID(),
1385                                 peer_id,
1386                                 channelnum);
1387                 // Buffer the packet
1388                 SharedBuffer<u8> data = channel->incoming_splits.insert(packet, reliable);
1389                 if(data.getSize() != 0)
1390                 {
1391                         PrintInfo();
1392                         dout_con<<"RETURNING TYPE_SPLIT: Constructed full data, "
1393                                         <<"size="<<data.getSize()<<std::endl;
1394                         return data;
1395                 }
1396                 PrintInfo();
1397                 dout_con<<"BUFFERED TYPE_SPLIT"<<std::endl;
1398                 throw ProcessedSilentlyException("Buffered a split packet chunk");
1399         }
1400         else if(type == TYPE_RELIABLE)
1401         {
1402                 // Recursive reliable packets not allowed
1403                 assert(reliable == false);
1404
1405                 if(packetdata.getSize() < RELIABLE_HEADER_SIZE)
1406                         throw InvalidIncomingDataException
1407                                         ("packetdata.getSize() < RELIABLE_HEADER_SIZE");
1408
1409                 u16 seqnum = readU16(&packetdata[1]);
1410
1411                 bool is_future_packet = seqnum_higher(seqnum, channel->next_incoming_seqnum);
1412                 bool is_old_packet = seqnum_higher(channel->next_incoming_seqnum, seqnum);
1413                 
1414                 PrintInfo();
1415                 if(is_future_packet)
1416                         dout_con<<"BUFFERING";
1417                 else if(is_old_packet)
1418                         dout_con<<"OLD";
1419                 else
1420                         dout_con<<"RECUR";
1421                 dout_con<<" TYPE_RELIABLE seqnum="<<seqnum
1422                                 <<" next="<<channel->next_incoming_seqnum;
1423                 dout_con<<" [sending CONTROLTYPE_ACK"
1424                                 " to peer_id="<<peer_id<<"]";
1425                 dout_con<<std::endl;
1426                 
1427                 //DEBUG
1428                 //assert(channel->incoming_reliables.size() < 100);
1429
1430                 // Send a CONTROLTYPE_ACK
1431                 SharedBuffer<u8> reply(4);
1432                 writeU8(&reply[0], TYPE_CONTROL);
1433                 writeU8(&reply[1], CONTROLTYPE_ACK);
1434                 writeU16(&reply[2], seqnum);
1435                 rawSendAsPacket(peer_id, channelnum, reply, false);
1436
1437                 //if(seqnum_higher(seqnum, channel->next_incoming_seqnum))
1438                 if(is_future_packet)
1439                 {
1440                         /*PrintInfo();
1441                         dout_con<<"Buffering reliable packet (seqnum="
1442                                         <<seqnum<<")"<<std::endl;*/
1443                         
1444                         // This one comes later, buffer it.
1445                         // Actually we have to make a packet to buffer one.
1446                         // Well, we have all the ingredients, so just do it.
1447                         BufferedPacket packet = makePacket(
1448                                         getPeer(peer_id)->address,
1449                                         packetdata,
1450                                         GetProtocolID(),
1451                                         peer_id,
1452                                         channelnum);
1453                         try{
1454                                 channel->incoming_reliables.insert(packet);
1455                                 
1456                                 /*PrintInfo();
1457                                 dout_con<<"INCOMING: ";
1458                                 channel->incoming_reliables.print();
1459                                 dout_con<<std::endl;*/
1460                         }
1461                         catch(AlreadyExistsException &e)
1462                         {
1463                         }
1464
1465                         throw ProcessedSilentlyException("Buffered future reliable packet");
1466                 }
1467                 //else if(seqnum_higher(channel->next_incoming_seqnum, seqnum))
1468                 else if(is_old_packet)
1469                 {
1470                         // An old packet, dump it
1471                         throw InvalidIncomingDataException("Got an old reliable packet");
1472                 }
1473
1474                 channel->next_incoming_seqnum++;
1475
1476                 // Get out the inside packet and re-process it
1477                 SharedBuffer<u8> payload(packetdata.getSize() - RELIABLE_HEADER_SIZE);
1478                 memcpy(*payload, &packetdata[RELIABLE_HEADER_SIZE], payload.getSize());
1479
1480                 return processPacket(channel, payload, peer_id, channelnum, true);
1481         }
1482         else
1483         {
1484                 PrintInfo(derr_con);
1485                 derr_con<<"Got invalid type="<<((int)type&0xff)<<std::endl;
1486                 throw InvalidIncomingDataException("Invalid packet type");
1487         }
1488         
1489         // We should never get here.
1490         // If you get here, add an exception or a return to some of the
1491         // above conditionals.
1492         assert(0);
1493         throw BaseException("Error in Channel::ProcessPacket()");
1494 }
1495
1496 bool Connection::deletePeer(u16 peer_id, bool timeout)
1497 {
1498         if(m_peers.find(peer_id) == NULL)
1499                 return false;
1500         
1501         Peer *peer = m_peers[peer_id];
1502
1503         // Create event
1504         ConnectionEvent e;
1505         e.peerRemoved(peer_id, timeout, peer->address);
1506         putEvent(e);
1507
1508         delete m_peers[peer_id];
1509         m_peers.remove(peer_id);
1510         return true;
1511 }
1512
1513 /* Interface */
1514
1515 ConnectionEvent Connection::getEvent()
1516 {
1517         if(m_event_queue.size() == 0){
1518                 ConnectionEvent e;
1519                 e.type = CONNEVENT_NONE;
1520                 return e;
1521         }
1522         return m_event_queue.pop_front();
1523 }
1524
1525 ConnectionEvent Connection::waitEvent(u32 timeout_ms)
1526 {
1527         try{
1528                 return m_event_queue.pop_front(timeout_ms);
1529         } catch(ItemNotFoundException &e){
1530                 ConnectionEvent e;
1531                 e.type = CONNEVENT_NONE;
1532                 return e;
1533         }
1534 }
1535
1536 void Connection::putCommand(ConnectionCommand &c)
1537 {
1538         m_command_queue.push_back(c);
1539 }
1540
1541 void Connection::Serve(unsigned short port)
1542 {
1543         ConnectionCommand c;
1544         c.serve(port);
1545         putCommand(c);
1546 }
1547
1548 void Connection::Connect(Address address)
1549 {
1550         ConnectionCommand c;
1551         c.connect(address);
1552         putCommand(c);
1553 }
1554
1555 bool Connection::Connected()
1556 {
1557         JMutexAutoLock peerlock(m_peers_mutex);
1558
1559         if(m_peers.size() != 1)
1560                 return false;
1561                 
1562         core::map<u16, Peer*>::Node *node = m_peers.find(PEER_ID_SERVER);
1563         if(node == NULL)
1564                 return false;
1565         
1566         if(m_peer_id == PEER_ID_INEXISTENT)
1567                 return false;
1568         
1569         return true;
1570 }
1571
1572 void Connection::Disconnect()
1573 {
1574         ConnectionCommand c;
1575         c.disconnect();
1576         putCommand(c);
1577 }
1578
1579 u32 Connection::Receive(u16 &peer_id, u8 *data, u32 datasize)
1580 {
1581         for(;;){
1582                 ConnectionEvent e = waitEvent(m_bc_receive_timeout);
1583                 if(e.type != CONNEVENT_NONE)
1584                         dout_con<<getDesc()<<": Receive: got event: "
1585                                         <<e.describe()<<std::endl;
1586                 switch(e.type){
1587                 case CONNEVENT_NONE:
1588                         throw NoIncomingDataException("No incoming data");
1589                 case CONNEVENT_DATA_RECEIVED:
1590                         peer_id = e.peer_id;
1591                         memcpy(data, *e.data, e.data.getSize());
1592                         return e.data.getSize();
1593                 case CONNEVENT_PEER_ADDED: {
1594                         Peer tmp(e.peer_id, e.address);
1595                         if(m_bc_peerhandler)
1596                                 m_bc_peerhandler->peerAdded(&tmp);
1597                         continue; }
1598                 case CONNEVENT_PEER_REMOVED: {
1599                         Peer tmp(e.peer_id, e.address);
1600                         if(m_bc_peerhandler)
1601                                 m_bc_peerhandler->deletingPeer(&tmp, e.timeout);
1602                         continue; }
1603                 }
1604         }
1605         throw NoIncomingDataException("No incoming data");
1606 }
1607
1608 void Connection::SendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable)
1609 {
1610         assert(channelnum < CHANNEL_COUNT);
1611
1612         ConnectionCommand c;
1613         c.sendToAll(channelnum, data, reliable);
1614         putCommand(c);
1615 }
1616
1617 void Connection::Send(u16 peer_id, u8 channelnum,
1618                 SharedBuffer<u8> data, bool reliable)
1619 {
1620         assert(channelnum < CHANNEL_COUNT);
1621
1622         ConnectionCommand c;
1623         c.send(peer_id, channelnum, data, reliable);
1624         putCommand(c);
1625 }
1626
1627 void Connection::RunTimeouts(float dtime)
1628 {
1629         // No-op
1630 }
1631
1632 Address Connection::GetPeerAddress(u16 peer_id)
1633 {
1634         JMutexAutoLock peerlock(m_peers_mutex);
1635         return getPeer(peer_id)->address;
1636 }
1637
1638 float Connection::GetPeerAvgRTT(u16 peer_id)
1639 {
1640         JMutexAutoLock peerlock(m_peers_mutex);
1641         return getPeer(peer_id)->avg_rtt;
1642 }
1643
1644 void Connection::DeletePeer(u16 peer_id)
1645 {
1646         ConnectionCommand c;
1647         c.deletePeer(peer_id);
1648         putCommand(c);
1649 }
1650
1651 void Connection::PrintInfo(std::ostream &out)
1652 {
1653         out<<getDesc()<<": ";
1654 }
1655
1656 void Connection::PrintInfo()
1657 {
1658         PrintInfo(dout_con);
1659 }
1660
1661 std::string Connection::getDesc()
1662 {
1663         return std::string("con(")+itos(m_socket.GetHandle())+"/"+itos(m_peer_id)+")";
1664 }
1665
1666 } // namespace
1667