utility.h: Change Buffer's interface to be more compatible with SharedBuffer's interf...
[oweals/minetest.git] / src / connection.cpp
1 /*
2 Minetest-c55
3 Copyright (C) 2010 celeron55, Perttu Ahola <celeron55@gmail.com>
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License along
16 with this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20 #include "connection.h"
21 #include "main.h"
22 #include "serialization.h"
23 #include "log.h"
24 #include "porting.h"
25
26 namespace con
27 {
28
29 BufferedPacket makePacket(Address &address, u8 *data, u32 datasize,
30                 u32 protocol_id, u16 sender_peer_id, u8 channel)
31 {
32         u32 packet_size = datasize + BASE_HEADER_SIZE;
33         BufferedPacket p(packet_size);
34         p.address = address;
35
36         writeU32(&p.data[0], protocol_id);
37         writeU16(&p.data[4], sender_peer_id);
38         writeU8(&p.data[6], channel);
39
40         memcpy(&p.data[BASE_HEADER_SIZE], data, datasize);
41
42         return p;
43 }
44
45 BufferedPacket makePacket(Address &address, SharedBuffer<u8> &data,
46                 u32 protocol_id, u16 sender_peer_id, u8 channel)
47 {
48         return makePacket(address, *data, data.getSize(),
49                         protocol_id, sender_peer_id, channel);
50 }
51
52 SharedBuffer<u8> makeOriginalPacket(
53                 SharedBuffer<u8> data)
54 {
55         u32 header_size = 1;
56         u32 packet_size = data.getSize() + header_size;
57         SharedBuffer<u8> b(packet_size);
58
59         writeU8(&b[0], TYPE_ORIGINAL);
60
61         memcpy(&b[header_size], *data, data.getSize());
62
63         return b;
64 }
65
66 core::list<SharedBuffer<u8> > makeSplitPacket(
67                 SharedBuffer<u8> data,
68                 u32 chunksize_max,
69                 u16 seqnum)
70 {
71         // Chunk packets, containing the TYPE_SPLIT header
72         core::list<SharedBuffer<u8> > chunks;
73         
74         u32 chunk_header_size = 7;
75         u32 maximum_data_size = chunksize_max - chunk_header_size;
76         u32 start = 0;
77         u32 end = 0;
78         u32 chunk_num = 0;
79         do{
80                 end = start + maximum_data_size - 1;
81                 if(end > data.getSize() - 1)
82                         end = data.getSize() - 1;
83                 
84                 u32 payload_size = end - start + 1;
85                 u32 packet_size = chunk_header_size + payload_size;
86
87                 SharedBuffer<u8> chunk(packet_size);
88                 
89                 writeU8(&chunk[0], TYPE_SPLIT);
90                 writeU16(&chunk[1], seqnum);
91                 // [3] u16 chunk_count is written at next stage
92                 writeU16(&chunk[5], chunk_num);
93                 memcpy(&chunk[chunk_header_size], &data[start], payload_size);
94
95                 chunks.push_back(chunk);
96                 
97                 start = end + 1;
98                 chunk_num++;
99         }
100         while(end != data.getSize() - 1);
101
102         u16 chunk_count = chunks.getSize();
103
104         core::list<SharedBuffer<u8> >::Iterator i = chunks.begin();
105         for(; i != chunks.end(); i++)
106         {
107                 // Write chunk_count
108                 writeU16(&((*i)[3]), chunk_count);
109         }
110
111         return chunks;
112 }
113
114 core::list<SharedBuffer<u8> > makeAutoSplitPacket(
115                 SharedBuffer<u8> data,
116                 u32 chunksize_max,
117                 u16 &split_seqnum)
118 {
119         u32 original_header_size = 1;
120         core::list<SharedBuffer<u8> > list;
121         if(data.getSize() + original_header_size > chunksize_max)
122         {
123                 list = makeSplitPacket(data, chunksize_max, split_seqnum);
124                 split_seqnum++;
125                 return list;
126         }
127         else
128         {
129                 list.push_back(makeOriginalPacket(data));
130         }
131         return list;
132 }
133
134 SharedBuffer<u8> makeReliablePacket(
135                 SharedBuffer<u8> data,
136                 u16 seqnum)
137 {
138         /*dstream<<"BEGIN SharedBuffer<u8> makeReliablePacket()"<<std::endl;
139         dstream<<"data.getSize()="<<data.getSize()<<", data[0]="
140                         <<((unsigned int)data[0]&0xff)<<std::endl;*/
141         u32 header_size = 3;
142         u32 packet_size = data.getSize() + header_size;
143         SharedBuffer<u8> b(packet_size);
144
145         writeU8(&b[0], TYPE_RELIABLE);
146         writeU16(&b[1], seqnum);
147
148         memcpy(&b[header_size], *data, data.getSize());
149
150         /*dstream<<"data.getSize()="<<data.getSize()<<", data[0]="
151                         <<((unsigned int)data[0]&0xff)<<std::endl;*/
152         //dstream<<"END SharedBuffer<u8> makeReliablePacket()"<<std::endl;
153         return b;
154 }
155
156 /*
157         ReliablePacketBuffer
158 */
159
160 void ReliablePacketBuffer::print()
161 {
162         core::list<BufferedPacket>::Iterator i;
163         i = m_list.begin();
164         for(; i != m_list.end(); i++)
165         {
166                 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
167                 dout_con<<s<<" ";
168         }
169 }
170 bool ReliablePacketBuffer::empty()
171 {
172         return m_list.empty();
173 }
174 u32 ReliablePacketBuffer::size()
175 {
176         return m_list.getSize();
177 }
178 RPBSearchResult ReliablePacketBuffer::findPacket(u16 seqnum)
179 {
180         core::list<BufferedPacket>::Iterator i;
181         i = m_list.begin();
182         for(; i != m_list.end(); i++)
183         {
184                 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
185                 /*dout_con<<"findPacket(): finding seqnum="<<seqnum
186                                 <<", comparing to s="<<s<<std::endl;*/
187                 if(s == seqnum)
188                         break;
189         }
190         return i;
191 }
192 RPBSearchResult ReliablePacketBuffer::notFound()
193 {
194         return m_list.end();
195 }
196 u16 ReliablePacketBuffer::getFirstSeqnum()
197 {
198         if(empty())
199                 throw NotFoundException("Buffer is empty");
200         BufferedPacket p = *m_list.begin();
201         return readU16(&p.data[BASE_HEADER_SIZE+1]);
202 }
203 BufferedPacket ReliablePacketBuffer::popFirst()
204 {
205         if(empty())
206                 throw NotFoundException("Buffer is empty");
207         BufferedPacket p = *m_list.begin();
208         core::list<BufferedPacket>::Iterator i = m_list.begin();
209         m_list.erase(i);
210         return p;
211 }
212 BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum)
213 {
214         RPBSearchResult r = findPacket(seqnum);
215         if(r == notFound()){
216                 dout_con<<"Not found"<<std::endl;
217                 throw NotFoundException("seqnum not found in buffer");
218         }
219         BufferedPacket p = *r;
220         m_list.erase(r);
221         return p;
222 }
223 void ReliablePacketBuffer::insert(BufferedPacket &p)
224 {
225         assert(p.data.getSize() >= BASE_HEADER_SIZE+3);
226         u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
227         assert(type == TYPE_RELIABLE);
228         u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
229
230         // Find the right place for the packet and insert it there
231
232         // If list is empty, just add it
233         if(m_list.empty())
234         {
235                 m_list.push_back(p);
236                 // Done.
237                 return;
238         }
239         // Otherwise find the right place
240         core::list<BufferedPacket>::Iterator i;
241         i = m_list.begin();
242         // Find the first packet in the list which has a higher seqnum
243         for(; i != m_list.end(); i++){
244                 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
245                 if(s == seqnum){
246                         throw AlreadyExistsException("Same seqnum in list");
247                 }
248                 if(seqnum_higher(s, seqnum)){
249                         break;
250                 }
251         }
252         // If we're at the end of the list, add the packet to the
253         // end of the list
254         if(i == m_list.end())
255         {
256                 m_list.push_back(p);
257                 // Done.
258                 return;
259         }
260         // Insert before i
261         m_list.insert_before(i, p);
262 }
263
264 void ReliablePacketBuffer::incrementTimeouts(float dtime)
265 {
266         core::list<BufferedPacket>::Iterator i;
267         i = m_list.begin();
268         for(; i != m_list.end(); i++){
269                 i->time += dtime;
270                 i->totaltime += dtime;
271         }
272 }
273
274 void ReliablePacketBuffer::resetTimedOuts(float timeout)
275 {
276         core::list<BufferedPacket>::Iterator i;
277         i = m_list.begin();
278         for(; i != m_list.end(); i++){
279                 if(i->time >= timeout)
280                         i->time = 0.0;
281         }
282 }
283
284 bool ReliablePacketBuffer::anyTotaltimeReached(float timeout)
285 {
286         core::list<BufferedPacket>::Iterator i;
287         i = m_list.begin();
288         for(; i != m_list.end(); i++){
289                 if(i->totaltime >= timeout)
290                         return true;
291         }
292         return false;
293 }
294
295 core::list<BufferedPacket> ReliablePacketBuffer::getTimedOuts(float timeout)
296 {
297         core::list<BufferedPacket> timed_outs;
298         core::list<BufferedPacket>::Iterator i;
299         i = m_list.begin();
300         for(; i != m_list.end(); i++)
301         {
302                 if(i->time >= timeout)
303                         timed_outs.push_back(*i);
304         }
305         return timed_outs;
306 }
307
308 /*
309         IncomingSplitBuffer
310 */
311
312 IncomingSplitBuffer::~IncomingSplitBuffer()
313 {
314         core::map<u16, IncomingSplitPacket*>::Iterator i;
315         i = m_buf.getIterator();
316         for(; i.atEnd() == false; i++)
317         {
318                 delete i.getNode()->getValue();
319         }
320 }
321 /*
322         This will throw a GotSplitPacketException when a full
323         split packet is constructed.
324 */
325 SharedBuffer<u8> IncomingSplitBuffer::insert(BufferedPacket &p, bool reliable)
326 {
327         u32 headersize = BASE_HEADER_SIZE + 7;
328         assert(p.data.getSize() >= headersize);
329         u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
330         assert(type == TYPE_SPLIT);
331         u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
332         u16 chunk_count = readU16(&p.data[BASE_HEADER_SIZE+3]);
333         u16 chunk_num = readU16(&p.data[BASE_HEADER_SIZE+5]);
334
335         // Add if doesn't exist
336         if(m_buf.find(seqnum) == NULL)
337         {
338                 IncomingSplitPacket *sp = new IncomingSplitPacket();
339                 sp->chunk_count = chunk_count;
340                 sp->reliable = reliable;
341                 m_buf[seqnum] = sp;
342         }
343         
344         IncomingSplitPacket *sp = m_buf[seqnum];
345         
346         // TODO: These errors should be thrown or something? Dunno.
347         if(chunk_count != sp->chunk_count)
348                 derr_con<<"Connection: WARNING: chunk_count="<<chunk_count
349                                 <<" != sp->chunk_count="<<sp->chunk_count
350                                 <<std::endl;
351         if(reliable != sp->reliable)
352                 derr_con<<"Connection: WARNING: reliable="<<reliable
353                                 <<" != sp->reliable="<<sp->reliable
354                                 <<std::endl;
355
356         // If chunk already exists, cancel
357         if(sp->chunks.find(chunk_num) != NULL)
358                 throw AlreadyExistsException("Chunk already in buffer");
359         
360         // Cut chunk data out of packet
361         u32 chunkdatasize = p.data.getSize() - headersize;
362         SharedBuffer<u8> chunkdata(chunkdatasize);
363         memcpy(*chunkdata, &(p.data[headersize]), chunkdatasize);
364         
365         // Set chunk data in buffer
366         sp->chunks[chunk_num] = chunkdata;
367         
368         // If not all chunks are received, return empty buffer
369         if(sp->allReceived() == false)
370                 return SharedBuffer<u8>();
371
372         // Calculate total size
373         u32 totalsize = 0;
374         core::map<u16, SharedBuffer<u8> >::Iterator i;
375         i = sp->chunks.getIterator();
376         for(; i.atEnd() == false; i++)
377         {
378                 totalsize += i.getNode()->getValue().getSize();
379         }
380         
381         SharedBuffer<u8> fulldata(totalsize);
382
383         // Copy chunks to data buffer
384         u32 start = 0;
385         for(u32 chunk_i=0; chunk_i<sp->chunk_count;
386                         chunk_i++)
387         {
388                 SharedBuffer<u8> buf = sp->chunks[chunk_i];
389                 u16 chunkdatasize = buf.getSize();
390                 memcpy(&fulldata[start], *buf, chunkdatasize);
391                 start += chunkdatasize;;
392         }
393
394         // Remove sp from buffer
395         m_buf.remove(seqnum);
396         delete sp;
397
398         return fulldata;
399 }
400 void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout)
401 {
402         core::list<u16> remove_queue;
403         core::map<u16, IncomingSplitPacket*>::Iterator i;
404         i = m_buf.getIterator();
405         for(; i.atEnd() == false; i++)
406         {
407                 IncomingSplitPacket *p = i.getNode()->getValue();
408                 // Reliable ones are not removed by timeout
409                 if(p->reliable == true)
410                         continue;
411                 p->time += dtime;
412                 if(p->time >= timeout)
413                         remove_queue.push_back(i.getNode()->getKey());
414         }
415         core::list<u16>::Iterator j;
416         j = remove_queue.begin();
417         for(; j != remove_queue.end(); j++)
418         {
419                 dout_con<<"NOTE: Removing timed out unreliable split packet"
420                                 <<std::endl;
421                 delete m_buf[*j];
422                 m_buf.remove(*j);
423         }
424 }
425
426 /*
427         Channel
428 */
429
430 Channel::Channel()
431 {
432         next_outgoing_seqnum = SEQNUM_INITIAL;
433         next_incoming_seqnum = SEQNUM_INITIAL;
434         next_outgoing_split_seqnum = SEQNUM_INITIAL;
435 }
436 Channel::~Channel()
437 {
438 }
439
440 /*
441         Peer
442 */
443
444 Peer::Peer(u16 a_id, Address a_address):
445         address(a_address),
446         id(a_id),
447         timeout_counter(0.0),
448         ping_timer(0.0),
449         resend_timeout(0.5),
450         avg_rtt(-1.0),
451         has_sent_with_id(false),
452         m_sendtime_accu(0),
453         m_max_packets_per_second(10),
454         m_num_sent(0),
455         m_max_num_sent(0)
456 {
457 }
458 Peer::~Peer()
459 {
460 }
461
462 void Peer::reportRTT(float rtt)
463 {
464         if(rtt >= 0.0){
465                 if(rtt < 0.01){
466                         if(m_max_packets_per_second < 100)
467                                 m_max_packets_per_second += 10;
468                 } else if(rtt < 0.2){
469                         if(m_max_packets_per_second < 100)
470                                 m_max_packets_per_second += 2;
471                 } else {
472                         m_max_packets_per_second *= 0.8;
473                         if(m_max_packets_per_second < 10)
474                                 m_max_packets_per_second = 10;
475                 }
476         }
477
478         if(rtt < -0.999)
479         {}
480         else if(avg_rtt < 0.0)
481                 avg_rtt = rtt;
482         else
483                 avg_rtt = rtt * 0.1 + avg_rtt * 0.9;
484         
485         // Calculate resend_timeout
486
487         /*int reliable_count = 0;
488         for(int i=0; i<CHANNEL_COUNT; i++)
489         {
490                 reliable_count += channels[i].outgoing_reliables.size();
491         }
492         float timeout = avg_rtt * RESEND_TIMEOUT_FACTOR
493                         * ((float)reliable_count * 1);*/
494         
495         float timeout = avg_rtt * RESEND_TIMEOUT_FACTOR;
496         if(timeout < RESEND_TIMEOUT_MIN)
497                 timeout = RESEND_TIMEOUT_MIN;
498         if(timeout > RESEND_TIMEOUT_MAX)
499                 timeout = RESEND_TIMEOUT_MAX;
500         resend_timeout = timeout;
501 }
502                                 
503 /*
504         Connection
505 */
506
507 Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout):
508         m_protocol_id(protocol_id),
509         m_max_packet_size(max_packet_size),
510         m_timeout(timeout),
511         m_peer_id(0),
512         m_bc_peerhandler(NULL),
513         m_bc_receive_timeout(0),
514         m_indentation(0)
515 {
516         m_socket.setTimeoutMs(5);
517
518         Start();
519 }
520
521 Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout,
522                 PeerHandler *peerhandler):
523         m_protocol_id(protocol_id),
524         m_max_packet_size(max_packet_size),
525         m_timeout(timeout),
526         m_peer_id(0),
527         m_bc_peerhandler(peerhandler),
528         m_bc_receive_timeout(0),
529         m_indentation(0)
530 {
531         m_socket.setTimeoutMs(5);
532
533         Start();
534 }
535
536
537 Connection::~Connection()
538 {
539         stop();
540 }
541
542 /* Internal stuff */
543
544 void * Connection::Thread()
545 {
546         ThreadStarted();
547         log_register_thread("Connection");
548
549         dout_con<<"Connection thread started"<<std::endl;
550         
551         u32 curtime = porting::getTimeMs();
552         u32 lasttime = curtime;
553
554         while(getRun())
555         {
556                 BEGIN_DEBUG_EXCEPTION_HANDLER
557                 
558                 lasttime = curtime;
559                 curtime = porting::getTimeMs();
560                 float dtime = (float)(curtime - lasttime) / 1000.;
561                 if(dtime > 0.1)
562                         dtime = 0.1;
563                 if(dtime < 0.0)
564                         dtime = 0.0;
565                 
566                 runTimeouts(dtime);
567
568                 while(m_command_queue.size() != 0){
569                         ConnectionCommand c = m_command_queue.pop_front();
570                         processCommand(c);
571                 }
572
573                 send(dtime);
574
575                 receive();
576                 
577                 END_DEBUG_EXCEPTION_HANDLER(derr_con);
578         }
579
580         return NULL;
581 }
582
583 void Connection::putEvent(ConnectionEvent &e)
584 {
585         assert(e.type != CONNEVENT_NONE);
586         m_event_queue.push_back(e);
587 }
588
589 void Connection::processCommand(ConnectionCommand &c)
590 {
591         switch(c.type){
592         case CONNCMD_NONE:
593                 dout_con<<getDesc()<<" processing CONNCMD_NONE"<<std::endl;
594                 return;
595         case CONNCMD_SERVE:
596                 dout_con<<getDesc()<<" processing CONNCMD_SERVE port="
597                                 <<c.port<<std::endl;
598                 serve(c.port);
599                 return;
600         case CONNCMD_CONNECT:
601                 dout_con<<getDesc()<<" processing CONNCMD_CONNECT"<<std::endl;
602                 connect(c.address);
603                 return;
604         case CONNCMD_DISCONNECT:
605                 dout_con<<getDesc()<<" processing CONNCMD_DISCONNECT"<<std::endl;
606                 disconnect();
607                 return;
608         case CONNCMD_SEND:
609                 dout_con<<getDesc()<<" processing CONNCMD_SEND"<<std::endl;
610                 send(c.peer_id, c.channelnum, c.data, c.reliable);
611                 return;
612         case CONNCMD_SEND_TO_ALL:
613                 dout_con<<getDesc()<<" processing CONNCMD_SEND_TO_ALL"<<std::endl;
614                 sendToAll(c.channelnum, c.data, c.reliable);
615                 return;
616         case CONNCMD_DELETE_PEER:
617                 dout_con<<getDesc()<<" processing CONNCMD_DELETE_PEER"<<std::endl;
618                 deletePeer(c.peer_id, false);
619                 return;
620         }
621 }
622
623 void Connection::send(float dtime)
624 {
625         for(core::map<u16, Peer*>::Iterator
626                         j = m_peers.getIterator();
627                         j.atEnd() == false; j++)
628         {
629                 Peer *peer = j.getNode()->getValue();
630                 peer->m_sendtime_accu += dtime;
631                 peer->m_num_sent = 0;
632                 peer->m_max_num_sent = peer->m_sendtime_accu *
633                                 peer->m_max_packets_per_second;
634         }
635         Queue<OutgoingPacket> postponed_packets;
636         while(m_outgoing_queue.size() != 0){
637                 OutgoingPacket packet = m_outgoing_queue.pop_front();
638                 Peer *peer = getPeerNoEx(packet.peer_id);
639                 if(!peer)
640                         continue;
641                 if(peer->channels[packet.channelnum].outgoing_reliables.size() >= 5){
642                         postponed_packets.push_back(packet);
643                 } else if(peer->m_num_sent < peer->m_max_num_sent){
644                         rawSendAsPacket(packet.peer_id, packet.channelnum,
645                                         packet.data, packet.reliable);
646                         peer->m_num_sent++;
647                 } else {
648                         postponed_packets.push_back(packet);
649                 }
650         }
651         while(postponed_packets.size() != 0){
652                 m_outgoing_queue.push_back(postponed_packets.pop_front());
653         }
654         for(core::map<u16, Peer*>::Iterator
655                         j = m_peers.getIterator();
656                         j.atEnd() == false; j++)
657         {
658                 Peer *peer = j.getNode()->getValue();
659                 peer->m_sendtime_accu -= (float)peer->m_num_sent /
660                                 peer->m_max_packets_per_second;
661                 if(peer->m_sendtime_accu > 10. / peer->m_max_packets_per_second)
662                         peer->m_sendtime_accu = 10. / peer->m_max_packets_per_second;
663         }
664 }
665
666 // Receive packets from the network and buffers and create ConnectionEvents
667 void Connection::receive()
668 {
669         u32 datasize = 100000;
670         // TODO: We can not know how many layers of header there are.
671         // For now, just assume there are no other than the base headers.
672         u32 packet_maxsize = datasize + BASE_HEADER_SIZE;
673         SharedBuffer<u8> packetdata(packet_maxsize);
674
675         bool single_wait_done = false;
676         
677         for(;;)
678         {
679         try{
680                 /* Check if some buffer has relevant data */
681                 {
682                         u16 peer_id;
683                         SharedBuffer<u8> resultdata;
684                         bool got = getFromBuffers(peer_id, resultdata);
685                         if(got){
686                                 ConnectionEvent e;
687                                 e.dataReceived(peer_id, resultdata);
688                                 putEvent(e);
689                                 continue;
690                         }
691                 }
692                 
693                 if(single_wait_done){
694                         if(m_socket.WaitData(0) == false)
695                                 break;
696                 }
697                 
698                 single_wait_done = true;
699
700                 Address sender;
701                 s32 received_size = m_socket.Receive(sender, *packetdata, packet_maxsize);
702
703                 if(received_size < 0)
704                         break;
705                 if(received_size < BASE_HEADER_SIZE)
706                         continue;
707                 if(readU32(&packetdata[0]) != m_protocol_id)
708                         continue;
709                 
710                 u16 peer_id = readPeerId(*packetdata);
711                 u8 channelnum = readChannel(*packetdata);
712                 if(channelnum > CHANNEL_COUNT-1){
713                         PrintInfo(derr_con);
714                         derr_con<<"Receive(): Invalid channel "<<channelnum<<std::endl;
715                         throw InvalidIncomingDataException("Channel doesn't exist");
716                 }
717
718                 if(peer_id == PEER_ID_INEXISTENT)
719                 {
720                         /*
721                                 Somebody is trying to send stuff to us with no peer id.
722                                 
723                                 Check if the same address and port was added to our peer
724                                 list before.
725                                 Allow only entries that have has_sent_with_id==false.
726                         */
727
728                         core::map<u16, Peer*>::Iterator j;
729                         j = m_peers.getIterator();
730                         for(; j.atEnd() == false; j++)
731                         {
732                                 Peer *peer = j.getNode()->getValue();
733                                 if(peer->has_sent_with_id)
734                                         continue;
735                                 if(peer->address == sender)
736                                         break;
737                         }
738                         
739                         /*
740                                 If no peer was found with the same address and port,
741                                 we shall assume it is a new peer and create an entry.
742                         */
743                         if(j.atEnd())
744                         {
745                                 // Pass on to adding the peer
746                         }
747                         // Else: A peer was found.
748                         else
749                         {
750                                 Peer *peer = j.getNode()->getValue();
751                                 peer_id = peer->id;
752                                 PrintInfo(derr_con);
753                                 derr_con<<"WARNING: Assuming unknown peer to be "
754                                                 <<"peer_id="<<peer_id<<std::endl;
755                         }
756                 }
757                 
758                 /*
759                         The peer was not found in our lists. Add it.
760                 */
761                 if(peer_id == PEER_ID_INEXISTENT)
762                 {
763                         // Somebody wants to make a new connection
764
765                         // Get a unique peer id (2 or higher)
766                         u16 peer_id_new = 2;
767                         /*
768                                 Find an unused peer id
769                         */
770                         bool out_of_ids = false;
771                         for(;;)
772                         {
773                                 // Check if exists
774                                 if(m_peers.find(peer_id_new) == NULL)
775                                         break;
776                                 // Check for overflow
777                                 if(peer_id_new == 65535){
778                                         out_of_ids = true;
779                                         break;
780                                 }
781                                 peer_id_new++;
782                         }
783                         if(out_of_ids){
784                                 errorstream<<getDesc()<<" ran out of peer ids"<<std::endl;
785                                 continue;
786                         }
787
788                         PrintInfo();
789                         dout_con<<"Receive(): Got a packet with peer_id=PEER_ID_INEXISTENT,"
790                                         " giving peer_id="<<peer_id_new<<std::endl;
791
792                         // Create a peer
793                         Peer *peer = new Peer(peer_id_new, sender);
794                         m_peers.insert(peer->id, peer);
795                         
796                         // Create peer addition event
797                         ConnectionEvent e;
798                         e.peerAdded(peer_id_new, sender);
799                         putEvent(e);
800                         
801                         // Create CONTROL packet to tell the peer id to the new peer.
802                         SharedBuffer<u8> reply(4);
803                         writeU8(&reply[0], TYPE_CONTROL);
804                         writeU8(&reply[1], CONTROLTYPE_SET_PEER_ID);
805                         writeU16(&reply[2], peer_id_new);
806                         sendAsPacket(peer_id_new, 0, reply, true);
807                         
808                         // We're now talking to a valid peer_id
809                         peer_id = peer_id_new;
810
811                         // Go on and process whatever it sent
812                 }
813
814                 core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
815
816                 if(node == NULL)
817                 {
818                         // Peer not found
819                         // This means that the peer id of the sender is not PEER_ID_INEXISTENT
820                         // and it is invalid.
821                         PrintInfo(derr_con);
822                         derr_con<<"Receive(): Peer not found"<<std::endl;
823                         throw InvalidIncomingDataException("Peer not found (possible timeout)");
824                 }
825
826                 Peer *peer = node->getValue();
827
828                 // Validate peer address
829                 if(peer->address != sender)
830                 {
831                         PrintInfo(derr_con);
832                         derr_con<<"Peer "<<peer_id<<" sending from different address."
833                                         " Ignoring."<<std::endl;
834                         continue;
835                 }
836                 
837                 peer->timeout_counter = 0.0;
838
839                 Channel *channel = &(peer->channels[channelnum]);
840                 
841                 // Throw the received packet to channel->processPacket()
842
843                 // Make a new SharedBuffer from the data without the base headers
844                 SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE);
845                 memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE],
846                                 strippeddata.getSize());
847                 
848                 try{
849                         // Process it (the result is some data with no headers made by us)
850                         SharedBuffer<u8> resultdata = processPacket
851                                         (channel, strippeddata, peer_id, channelnum, false);
852                         
853                         PrintInfo();
854                         dout_con<<"ProcessPacket returned data of size "
855                                         <<resultdata.getSize()<<std::endl;
856                         
857                         if(datasize < resultdata.getSize())
858                                 throw InvalidIncomingDataException
859                                                 ("Buffer too small for received data");
860                         
861                         ConnectionEvent e;
862                         e.dataReceived(peer_id, resultdata);
863                         putEvent(e);
864                         continue;
865                 }catch(ProcessedSilentlyException &e){
866                 }
867         }catch(InvalidIncomingDataException &e){
868         }
869         catch(ProcessedSilentlyException &e){
870         }
871         } // for
872 }
873
874 void Connection::runTimeouts(float dtime)
875 {
876         core::list<u16> timeouted_peers;
877         core::map<u16, Peer*>::Iterator j;
878         j = m_peers.getIterator();
879         for(; j.atEnd() == false; j++)
880         {
881                 Peer *peer = j.getNode()->getValue();
882                 
883                 /*
884                         Check peer timeout
885                 */
886                 peer->timeout_counter += dtime;
887                 if(peer->timeout_counter > m_timeout)
888                 {
889                         PrintInfo(derr_con);
890                         derr_con<<"RunTimeouts(): Peer "<<peer->id
891                                         <<" has timed out."
892                                         <<" (source=peer->timeout_counter)"
893                                         <<std::endl;
894                         // Add peer to the list
895                         timeouted_peers.push_back(peer->id);
896                         // Don't bother going through the buffers of this one
897                         continue;
898                 }
899
900                 float resend_timeout = peer->resend_timeout;
901                 for(u16 i=0; i<CHANNEL_COUNT; i++)
902                 {
903                         core::list<BufferedPacket> timed_outs;
904                         core::list<BufferedPacket>::Iterator j;
905                         
906                         Channel *channel = &peer->channels[i];
907
908                         // Remove timed out incomplete unreliable split packets
909                         channel->incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout);
910                         
911                         // Increment reliable packet times
912                         channel->outgoing_reliables.incrementTimeouts(dtime);
913
914                         // Check reliable packet total times, remove peer if
915                         // over timeout.
916                         if(channel->outgoing_reliables.anyTotaltimeReached(m_timeout))
917                         {
918                                 PrintInfo(derr_con);
919                                 derr_con<<"RunTimeouts(): Peer "<<peer->id
920                                                 <<" has timed out."
921                                                 <<" (source=reliable packet totaltime)"
922                                                 <<std::endl;
923                                 // Add peer to the to-be-removed list
924                                 timeouted_peers.push_back(peer->id);
925                                 goto nextpeer;
926                         }
927
928                         // Re-send timed out outgoing reliables
929                         
930                         timed_outs = channel->
931                                         outgoing_reliables.getTimedOuts(resend_timeout);
932
933                         channel->outgoing_reliables.resetTimedOuts(resend_timeout);
934
935                         j = timed_outs.begin();
936                         for(; j != timed_outs.end(); j++)
937                         {
938                                 u16 peer_id = readPeerId(*(j->data));
939                                 u8 channel = readChannel(*(j->data));
940                                 u16 seqnum = readU16(&(j->data[BASE_HEADER_SIZE+1]));
941
942                                 PrintInfo(derr_con);
943                                 derr_con<<"RE-SENDING timed-out RELIABLE to ";
944                                 j->address.print(&derr_con);
945                                 derr_con<<"(t/o="<<resend_timeout<<"): "
946                                                 <<"from_peer_id="<<peer_id
947                                                 <<", channel="<<((int)channel&0xff)
948                                                 <<", seqnum="<<seqnum
949                                                 <<std::endl;
950
951                                 rawSend(*j);
952
953                                 // Enlarge avg_rtt and resend_timeout:
954                                 // The rtt will be at least the timeout.
955                                 // NOTE: This won't affect the timeout of the next
956                                 // checked channel because it was cached.
957                                 peer->reportRTT(resend_timeout);
958                         }
959                 }
960                 
961                 /*
962                         Send pings
963                 */
964                 peer->ping_timer += dtime;
965                 if(peer->ping_timer >= 5.0)
966                 {
967                         // Create and send PING packet
968                         SharedBuffer<u8> data(2);
969                         writeU8(&data[0], TYPE_CONTROL);
970                         writeU8(&data[1], CONTROLTYPE_PING);
971                         rawSendAsPacket(peer->id, 0, data, true);
972
973                         peer->ping_timer = 0.0;
974                 }
975                 
976 nextpeer:
977                 continue;
978         }
979
980         // Remove timed out peers
981         core::list<u16>::Iterator i = timeouted_peers.begin();
982         for(; i != timeouted_peers.end(); i++)
983         {
984                 PrintInfo(derr_con);
985                 derr_con<<"RunTimeouts(): Removing peer "<<(*i)<<std::endl;
986                 deletePeer(*i, true);
987         }
988 }
989
990 void Connection::serve(u16 port)
991 {
992         dout_con<<getDesc()<<" serving at port "<<port<<std::endl;
993         m_socket.Bind(port);
994         m_peer_id = PEER_ID_SERVER;
995 }
996
997 void Connection::connect(Address address)
998 {
999         dout_con<<getDesc()<<" connecting to "<<address.serializeString()
1000                         <<":"<<address.getPort()<<std::endl;
1001
1002         core::map<u16, Peer*>::Node *node = m_peers.find(PEER_ID_SERVER);
1003         if(node != NULL){
1004                 throw ConnectionException("Already connected to a server");
1005         }
1006
1007         Peer *peer = new Peer(PEER_ID_SERVER, address);
1008         m_peers.insert(peer->id, peer);
1009
1010         // Create event
1011         ConnectionEvent e;
1012         e.peerAdded(peer->id, peer->address);
1013         putEvent(e);
1014         
1015         m_socket.Bind(0);
1016         
1017         // Send a dummy packet to server with peer_id = PEER_ID_INEXISTENT
1018         m_peer_id = PEER_ID_INEXISTENT;
1019         SharedBuffer<u8> data(0);
1020         Send(PEER_ID_SERVER, 0, data, true);
1021 }
1022
1023 void Connection::disconnect()
1024 {
1025         dout_con<<getDesc()<<" disconnecting"<<std::endl;
1026
1027         // Create and send DISCO packet
1028         SharedBuffer<u8> data(2);
1029         writeU8(&data[0], TYPE_CONTROL);
1030         writeU8(&data[1], CONTROLTYPE_DISCO);
1031         
1032         // Send to all
1033         core::map<u16, Peer*>::Iterator j;
1034         j = m_peers.getIterator();
1035         for(; j.atEnd() == false; j++)
1036         {
1037                 Peer *peer = j.getNode()->getValue();
1038                 rawSendAsPacket(peer->id, 0, data, false);
1039         }
1040 }
1041
1042 void Connection::sendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable)
1043 {
1044         core::map<u16, Peer*>::Iterator j;
1045         j = m_peers.getIterator();
1046         for(; j.atEnd() == false; j++)
1047         {
1048                 Peer *peer = j.getNode()->getValue();
1049                 send(peer->id, channelnum, data, reliable);
1050         }
1051 }
1052
1053 void Connection::send(u16 peer_id, u8 channelnum,
1054                 SharedBuffer<u8> data, bool reliable)
1055 {
1056         dout_con<<getDesc()<<" sending to peer_id="<<peer_id<<std::endl;
1057
1058         assert(channelnum < CHANNEL_COUNT);
1059         
1060         Peer *peer = getPeerNoEx(peer_id);
1061         if(peer == NULL)
1062                 return;
1063         Channel *channel = &(peer->channels[channelnum]);
1064
1065         u32 chunksize_max = m_max_packet_size - BASE_HEADER_SIZE;
1066         if(reliable)
1067                 chunksize_max -= RELIABLE_HEADER_SIZE;
1068
1069         core::list<SharedBuffer<u8> > originals;
1070         originals = makeAutoSplitPacket(data, chunksize_max,
1071                         channel->next_outgoing_split_seqnum);
1072         
1073         core::list<SharedBuffer<u8> >::Iterator i;
1074         i = originals.begin();
1075         for(; i != originals.end(); i++)
1076         {
1077                 SharedBuffer<u8> original = *i;
1078                 
1079                 sendAsPacket(peer_id, channelnum, original, reliable);
1080         }
1081 }
1082
1083 void Connection::sendAsPacket(u16 peer_id, u8 channelnum,
1084                 SharedBuffer<u8> data, bool reliable)
1085 {
1086         OutgoingPacket packet(peer_id, channelnum, data, reliable);
1087         m_outgoing_queue.push_back(packet);
1088 }
1089
1090 void Connection::rawSendAsPacket(u16 peer_id, u8 channelnum,
1091                 SharedBuffer<u8> data, bool reliable)
1092 {
1093         Peer *peer = getPeerNoEx(peer_id);
1094         if(!peer)
1095                 return;
1096         Channel *channel = &(peer->channels[channelnum]);
1097
1098         if(reliable)
1099         {
1100                 u16 seqnum = channel->next_outgoing_seqnum;
1101                 channel->next_outgoing_seqnum++;
1102
1103                 SharedBuffer<u8> reliable = makeReliablePacket(data, seqnum);
1104
1105                 // Add base headers and make a packet
1106                 BufferedPacket p = makePacket(peer->address, reliable,
1107                                 m_protocol_id, m_peer_id, channelnum);
1108                 
1109                 try{
1110                         // Buffer the packet
1111                         channel->outgoing_reliables.insert(p);
1112                 }
1113                 catch(AlreadyExistsException &e)
1114                 {
1115                         PrintInfo(derr_con);
1116                         derr_con<<"WARNING: Going to send a reliable packet "
1117                                         "seqnum="<<seqnum<<" that is already "
1118                                         "in outgoing buffer"<<std::endl;
1119                         //assert(0);
1120                 }
1121                 
1122                 // Send the packet
1123                 rawSend(p);
1124         }
1125         else
1126         {
1127                 // Add base headers and make a packet
1128                 BufferedPacket p = makePacket(peer->address, data,
1129                                 m_protocol_id, m_peer_id, channelnum);
1130
1131                 // Send the packet
1132                 rawSend(p);
1133         }
1134 }
1135
1136 void Connection::rawSend(const BufferedPacket &packet)
1137 {
1138         try{
1139                 m_socket.Send(packet.address, *packet.data, packet.data.getSize());
1140         } catch(SendFailedException &e){
1141                 derr_con<<"Connection::rawSend(): SendFailedException: "
1142                                 <<packet.address.serializeString()<<std::endl;
1143         }
1144 }
1145
1146 Peer* Connection::getPeer(u16 peer_id)
1147 {
1148         core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
1149
1150         if(node == NULL){
1151                 throw PeerNotFoundException("GetPeer: Peer not found (possible timeout)");
1152         }
1153
1154         // Error checking
1155         assert(node->getValue()->id == peer_id);
1156
1157         return node->getValue();
1158 }
1159
1160 Peer* Connection::getPeerNoEx(u16 peer_id)
1161 {
1162         core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
1163
1164         if(node == NULL){
1165                 return NULL;
1166         }
1167
1168         // Error checking
1169         assert(node->getValue()->id == peer_id);
1170
1171         return node->getValue();
1172 }
1173
1174 core::list<Peer*> Connection::getPeers()
1175 {
1176         core::list<Peer*> list;
1177         core::map<u16, Peer*>::Iterator j;
1178         j = m_peers.getIterator();
1179         for(; j.atEnd() == false; j++)
1180         {
1181                 Peer *peer = j.getNode()->getValue();
1182                 list.push_back(peer);
1183         }
1184         return list;
1185 }
1186
1187 bool Connection::getFromBuffers(u16 &peer_id, SharedBuffer<u8> &dst)
1188 {
1189         core::map<u16, Peer*>::Iterator j;
1190         j = m_peers.getIterator();
1191         for(; j.atEnd() == false; j++)
1192         {
1193                 Peer *peer = j.getNode()->getValue();
1194                 for(u16 i=0; i<CHANNEL_COUNT; i++)
1195                 {
1196                         Channel *channel = &peer->channels[i];
1197                         SharedBuffer<u8> resultdata;
1198                         bool got = checkIncomingBuffers(channel, peer_id, resultdata);
1199                         if(got){
1200                                 dst = resultdata;
1201                                 return true;
1202                         }
1203                 }
1204         }
1205         return false;
1206 }
1207
1208 bool Connection::checkIncomingBuffers(Channel *channel, u16 &peer_id,
1209                 SharedBuffer<u8> &dst)
1210 {
1211         u16 firstseqnum = 0;
1212         // Clear old packets from start of buffer
1213         try{
1214         for(;;){
1215                 firstseqnum = channel->incoming_reliables.getFirstSeqnum();
1216                 if(seqnum_higher(channel->next_incoming_seqnum, firstseqnum))
1217                         channel->incoming_reliables.popFirst();
1218                 else
1219                         break;
1220         }
1221         // This happens if all packets are old
1222         }catch(con::NotFoundException)
1223         {}
1224         
1225         if(channel->incoming_reliables.empty() == false)
1226         {
1227                 if(firstseqnum == channel->next_incoming_seqnum)
1228                 {
1229                         BufferedPacket p = channel->incoming_reliables.popFirst();
1230                         
1231                         peer_id = readPeerId(*p.data);
1232                         u8 channelnum = readChannel(*p.data);
1233                         u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
1234
1235                         PrintInfo();
1236                         dout_con<<"UNBUFFERING TYPE_RELIABLE"
1237                                         <<" seqnum="<<seqnum
1238                                         <<" peer_id="<<peer_id
1239                                         <<" channel="<<((int)channelnum&0xff)
1240                                         <<std::endl;
1241
1242                         channel->next_incoming_seqnum++;
1243                         
1244                         u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
1245                         // Get out the inside packet and re-process it
1246                         SharedBuffer<u8> payload(p.data.getSize() - headers_size);
1247                         memcpy(*payload, &p.data[headers_size], payload.getSize());
1248
1249                         dst = processPacket(channel, payload, peer_id, channelnum, true);
1250                         return true;
1251                 }
1252         }
1253         return false;
1254 }
1255
1256 SharedBuffer<u8> Connection::processPacket(Channel *channel,
1257                 SharedBuffer<u8> packetdata, u16 peer_id,
1258                 u8 channelnum, bool reliable)
1259 {
1260         IndentationRaiser iraiser(&(m_indentation));
1261
1262         if(packetdata.getSize() < 1)
1263                 throw InvalidIncomingDataException("packetdata.getSize() < 1");
1264
1265         u8 type = readU8(&packetdata[0]);
1266         
1267         if(type == TYPE_CONTROL)
1268         {
1269                 if(packetdata.getSize() < 2)
1270                         throw InvalidIncomingDataException("packetdata.getSize() < 2");
1271
1272                 u8 controltype = readU8(&packetdata[1]);
1273
1274                 if(controltype == CONTROLTYPE_ACK)
1275                 {
1276                         if(packetdata.getSize() < 4)
1277                                 throw InvalidIncomingDataException
1278                                                 ("packetdata.getSize() < 4 (ACK header size)");
1279
1280                         u16 seqnum = readU16(&packetdata[2]);
1281                         PrintInfo();
1282                         dout_con<<"Got CONTROLTYPE_ACK: channelnum="
1283                                         <<((int)channelnum&0xff)<<", peer_id="<<peer_id
1284                                         <<", seqnum="<<seqnum<<std::endl;
1285
1286                         try{
1287                                 BufferedPacket p = channel->outgoing_reliables.popSeqnum(seqnum);
1288                                 // Get round trip time
1289                                 float rtt = p.totaltime;
1290
1291                                 // Let peer calculate stuff according to it
1292                                 // (avg_rtt and resend_timeout)
1293                                 Peer *peer = getPeer(peer_id);
1294                                 peer->reportRTT(rtt);
1295
1296                                 //PrintInfo(dout_con);
1297                                 //dout_con<<"RTT = "<<rtt<<std::endl;
1298
1299                                 /*dout_con<<"OUTGOING: ";
1300                                 PrintInfo();
1301                                 channel->outgoing_reliables.print();
1302                                 dout_con<<std::endl;*/
1303                         }
1304                         catch(NotFoundException &e){
1305                                 PrintInfo(derr_con);
1306                                 derr_con<<"WARNING: ACKed packet not "
1307                                                 "in outgoing queue"
1308                                                 <<std::endl;
1309                         }
1310
1311                         throw ProcessedSilentlyException("Got an ACK");
1312                 }
1313                 else if(controltype == CONTROLTYPE_SET_PEER_ID)
1314                 {
1315                         if(packetdata.getSize() < 4)
1316                                 throw InvalidIncomingDataException
1317                                                 ("packetdata.getSize() < 4 (SET_PEER_ID header size)");
1318                         u16 peer_id_new = readU16(&packetdata[2]);
1319                         PrintInfo();
1320                         dout_con<<"Got new peer id: "<<peer_id_new<<"... "<<std::endl;
1321
1322                         if(GetPeerID() != PEER_ID_INEXISTENT)
1323                         {
1324                                 PrintInfo(derr_con);
1325                                 derr_con<<"WARNING: Not changing"
1326                                                 " existing peer id."<<std::endl;
1327                         }
1328                         else
1329                         {
1330                                 dout_con<<"changing."<<std::endl;
1331                                 SetPeerID(peer_id_new);
1332                         }
1333                         throw ProcessedSilentlyException("Got a SET_PEER_ID");
1334                 }
1335                 else if(controltype == CONTROLTYPE_PING)
1336                 {
1337                         // Just ignore it, the incoming data already reset
1338                         // the timeout counter
1339                         PrintInfo();
1340                         dout_con<<"PING"<<std::endl;
1341                         throw ProcessedSilentlyException("Got a PING");
1342                 }
1343                 else if(controltype == CONTROLTYPE_DISCO)
1344                 {
1345                         // Just ignore it, the incoming data already reset
1346                         // the timeout counter
1347                         PrintInfo();
1348                         dout_con<<"DISCO: Removing peer "<<(peer_id)<<std::endl;
1349                         
1350                         if(deletePeer(peer_id, false) == false)
1351                         {
1352                                 PrintInfo(derr_con);
1353                                 derr_con<<"DISCO: Peer not found"<<std::endl;
1354                         }
1355
1356                         throw ProcessedSilentlyException("Got a DISCO");
1357                 }
1358                 else{
1359                         PrintInfo(derr_con);
1360                         derr_con<<"INVALID TYPE_CONTROL: invalid controltype="
1361                                         <<((int)controltype&0xff)<<std::endl;
1362                         throw InvalidIncomingDataException("Invalid control type");
1363                 }
1364         }
1365         else if(type == TYPE_ORIGINAL)
1366         {
1367                 if(packetdata.getSize() < ORIGINAL_HEADER_SIZE)
1368                         throw InvalidIncomingDataException
1369                                         ("packetdata.getSize() < ORIGINAL_HEADER_SIZE");
1370                 PrintInfo();
1371                 dout_con<<"RETURNING TYPE_ORIGINAL to user"
1372                                 <<std::endl;
1373                 // Get the inside packet out and return it
1374                 SharedBuffer<u8> payload(packetdata.getSize() - ORIGINAL_HEADER_SIZE);
1375                 memcpy(*payload, &packetdata[ORIGINAL_HEADER_SIZE], payload.getSize());
1376                 return payload;
1377         }
1378         else if(type == TYPE_SPLIT)
1379         {
1380                 // We have to create a packet again for buffering
1381                 // This isn't actually too bad an idea.
1382                 BufferedPacket packet = makePacket(
1383                                 getPeer(peer_id)->address,
1384                                 packetdata,
1385                                 GetProtocolID(),
1386                                 peer_id,
1387                                 channelnum);
1388                 // Buffer the packet
1389                 SharedBuffer<u8> data = channel->incoming_splits.insert(packet, reliable);
1390                 if(data.getSize() != 0)
1391                 {
1392                         PrintInfo();
1393                         dout_con<<"RETURNING TYPE_SPLIT: Constructed full data, "
1394                                         <<"size="<<data.getSize()<<std::endl;
1395                         return data;
1396                 }
1397                 PrintInfo();
1398                 dout_con<<"BUFFERED TYPE_SPLIT"<<std::endl;
1399                 throw ProcessedSilentlyException("Buffered a split packet chunk");
1400         }
1401         else if(type == TYPE_RELIABLE)
1402         {
1403                 // Recursive reliable packets not allowed
1404                 assert(reliable == false);
1405
1406                 if(packetdata.getSize() < RELIABLE_HEADER_SIZE)
1407                         throw InvalidIncomingDataException
1408                                         ("packetdata.getSize() < RELIABLE_HEADER_SIZE");
1409
1410                 u16 seqnum = readU16(&packetdata[1]);
1411
1412                 bool is_future_packet = seqnum_higher(seqnum, channel->next_incoming_seqnum);
1413                 bool is_old_packet = seqnum_higher(channel->next_incoming_seqnum, seqnum);
1414                 
1415                 PrintInfo();
1416                 if(is_future_packet)
1417                         dout_con<<"BUFFERING";
1418                 else if(is_old_packet)
1419                         dout_con<<"OLD";
1420                 else
1421                         dout_con<<"RECUR";
1422                 dout_con<<" TYPE_RELIABLE seqnum="<<seqnum
1423                                 <<" next="<<channel->next_incoming_seqnum;
1424                 dout_con<<" [sending CONTROLTYPE_ACK"
1425                                 " to peer_id="<<peer_id<<"]";
1426                 dout_con<<std::endl;
1427                 
1428                 //DEBUG
1429                 //assert(channel->incoming_reliables.size() < 100);
1430
1431                 // Send a CONTROLTYPE_ACK
1432                 SharedBuffer<u8> reply(4);
1433                 writeU8(&reply[0], TYPE_CONTROL);
1434                 writeU8(&reply[1], CONTROLTYPE_ACK);
1435                 writeU16(&reply[2], seqnum);
1436                 rawSendAsPacket(peer_id, channelnum, reply, false);
1437
1438                 //if(seqnum_higher(seqnum, channel->next_incoming_seqnum))
1439                 if(is_future_packet)
1440                 {
1441                         /*PrintInfo();
1442                         dout_con<<"Buffering reliable packet (seqnum="
1443                                         <<seqnum<<")"<<std::endl;*/
1444                         
1445                         // This one comes later, buffer it.
1446                         // Actually we have to make a packet to buffer one.
1447                         // Well, we have all the ingredients, so just do it.
1448                         BufferedPacket packet = makePacket(
1449                                         getPeer(peer_id)->address,
1450                                         packetdata,
1451                                         GetProtocolID(),
1452                                         peer_id,
1453                                         channelnum);
1454                         try{
1455                                 channel->incoming_reliables.insert(packet);
1456                                 
1457                                 /*PrintInfo();
1458                                 dout_con<<"INCOMING: ";
1459                                 channel->incoming_reliables.print();
1460                                 dout_con<<std::endl;*/
1461                         }
1462                         catch(AlreadyExistsException &e)
1463                         {
1464                         }
1465
1466                         throw ProcessedSilentlyException("Buffered future reliable packet");
1467                 }
1468                 //else if(seqnum_higher(channel->next_incoming_seqnum, seqnum))
1469                 else if(is_old_packet)
1470                 {
1471                         // An old packet, dump it
1472                         throw InvalidIncomingDataException("Got an old reliable packet");
1473                 }
1474
1475                 channel->next_incoming_seqnum++;
1476
1477                 // Get out the inside packet and re-process it
1478                 SharedBuffer<u8> payload(packetdata.getSize() - RELIABLE_HEADER_SIZE);
1479                 memcpy(*payload, &packetdata[RELIABLE_HEADER_SIZE], payload.getSize());
1480
1481                 return processPacket(channel, payload, peer_id, channelnum, true);
1482         }
1483         else
1484         {
1485                 PrintInfo(derr_con);
1486                 derr_con<<"Got invalid type="<<((int)type&0xff)<<std::endl;
1487                 throw InvalidIncomingDataException("Invalid packet type");
1488         }
1489         
1490         // We should never get here.
1491         // If you get here, add an exception or a return to some of the
1492         // above conditionals.
1493         assert(0);
1494         throw BaseException("Error in Channel::ProcessPacket()");
1495 }
1496
1497 bool Connection::deletePeer(u16 peer_id, bool timeout)
1498 {
1499         if(m_peers.find(peer_id) == NULL)
1500                 return false;
1501         
1502         Peer *peer = m_peers[peer_id];
1503
1504         // Create event
1505         ConnectionEvent e;
1506         e.peerRemoved(peer_id, timeout, peer->address);
1507         putEvent(e);
1508
1509         delete m_peers[peer_id];
1510         m_peers.remove(peer_id);
1511         return true;
1512 }
1513
1514 /* Interface */
1515
1516 ConnectionEvent Connection::getEvent()
1517 {
1518         if(m_event_queue.size() == 0){
1519                 ConnectionEvent e;
1520                 e.type = CONNEVENT_NONE;
1521                 return e;
1522         }
1523         return m_event_queue.pop_front();
1524 }
1525
1526 ConnectionEvent Connection::waitEvent(u32 timeout_ms)
1527 {
1528         try{
1529                 return m_event_queue.pop_front(timeout_ms);
1530         } catch(ItemNotFoundException &ex){
1531                 ConnectionEvent e;
1532                 e.type = CONNEVENT_NONE;
1533                 return e;
1534         }
1535 }
1536
1537 void Connection::putCommand(ConnectionCommand &c)
1538 {
1539         m_command_queue.push_back(c);
1540 }
1541
1542 void Connection::Serve(unsigned short port)
1543 {
1544         ConnectionCommand c;
1545         c.serve(port);
1546         putCommand(c);
1547 }
1548
1549 void Connection::Connect(Address address)
1550 {
1551         ConnectionCommand c;
1552         c.connect(address);
1553         putCommand(c);
1554 }
1555
1556 bool Connection::Connected()
1557 {
1558         JMutexAutoLock peerlock(m_peers_mutex);
1559
1560         if(m_peers.size() != 1)
1561                 return false;
1562                 
1563         core::map<u16, Peer*>::Node *node = m_peers.find(PEER_ID_SERVER);
1564         if(node == NULL)
1565                 return false;
1566         
1567         if(m_peer_id == PEER_ID_INEXISTENT)
1568                 return false;
1569         
1570         return true;
1571 }
1572
1573 void Connection::Disconnect()
1574 {
1575         ConnectionCommand c;
1576         c.disconnect();
1577         putCommand(c);
1578 }
1579
1580 u32 Connection::Receive(u16 &peer_id, u8 *data, u32 datasize)
1581 {
1582         for(;;){
1583                 ConnectionEvent e = waitEvent(m_bc_receive_timeout);
1584                 if(e.type != CONNEVENT_NONE)
1585                         dout_con<<getDesc()<<": Receive: got event: "
1586                                         <<e.describe()<<std::endl;
1587                 switch(e.type){
1588                 case CONNEVENT_NONE:
1589                         throw NoIncomingDataException("No incoming data");
1590                 case CONNEVENT_DATA_RECEIVED:
1591                         peer_id = e.peer_id;
1592                         memcpy(data, *e.data, e.data.getSize());
1593                         return e.data.getSize();
1594                 case CONNEVENT_PEER_ADDED: {
1595                         Peer tmp(e.peer_id, e.address);
1596                         if(m_bc_peerhandler)
1597                                 m_bc_peerhandler->peerAdded(&tmp);
1598                         continue; }
1599                 case CONNEVENT_PEER_REMOVED: {
1600                         Peer tmp(e.peer_id, e.address);
1601                         if(m_bc_peerhandler)
1602                                 m_bc_peerhandler->deletingPeer(&tmp, e.timeout);
1603                         continue; }
1604                 }
1605         }
1606         throw NoIncomingDataException("No incoming data");
1607 }
1608
1609 void Connection::SendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable)
1610 {
1611         assert(channelnum < CHANNEL_COUNT);
1612
1613         ConnectionCommand c;
1614         c.sendToAll(channelnum, data, reliable);
1615         putCommand(c);
1616 }
1617
1618 void Connection::Send(u16 peer_id, u8 channelnum,
1619                 SharedBuffer<u8> data, bool reliable)
1620 {
1621         assert(channelnum < CHANNEL_COUNT);
1622
1623         ConnectionCommand c;
1624         c.send(peer_id, channelnum, data, reliable);
1625         putCommand(c);
1626 }
1627
1628 void Connection::RunTimeouts(float dtime)
1629 {
1630         // No-op
1631 }
1632
1633 Address Connection::GetPeerAddress(u16 peer_id)
1634 {
1635         JMutexAutoLock peerlock(m_peers_mutex);
1636         return getPeer(peer_id)->address;
1637 }
1638
1639 float Connection::GetPeerAvgRTT(u16 peer_id)
1640 {
1641         JMutexAutoLock peerlock(m_peers_mutex);
1642         return getPeer(peer_id)->avg_rtt;
1643 }
1644
1645 void Connection::DeletePeer(u16 peer_id)
1646 {
1647         ConnectionCommand c;
1648         c.deletePeer(peer_id);
1649         putCommand(c);
1650 }
1651
1652 void Connection::PrintInfo(std::ostream &out)
1653 {
1654         out<<getDesc()<<": ";
1655 }
1656
1657 void Connection::PrintInfo()
1658 {
1659         PrintInfo(dout_con);
1660 }
1661
1662 std::string Connection::getDesc()
1663 {
1664         return std::string("con(")+itos(m_socket.GetHandle())+"/"+itos(m_peer_id)+")";
1665 }
1666
1667 } // namespace
1668