Re-add jungles, apple trees
[oweals/minetest.git] / src / connection.cpp
1 /*
2 Minetest
3 Copyright (C) 2013 celeron55, Perttu Ahola <celeron55@gmail.com>
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU Lesser General Public License as published by
7 the Free Software Foundation; either version 2.1 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 GNU Lesser General Public License for more details.
14
15 You should have received a copy of the GNU Lesser General Public License along
16 with this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20 #include "connection.h"
21 #include "main.h"
22 #include "serialization.h"
23 #include "log.h"
24 #include "porting.h"
25 #include "util/serialize.h"
26 #include "util/numeric.h"
27 #include "util/string.h"
28 #include "settings.h"
29
30 namespace con
31 {
32
33 static u16 readPeerId(u8 *packetdata)
34 {
35         return readU16(&packetdata[4]);
36 }
37 static u8 readChannel(u8 *packetdata)
38 {
39         return readU8(&packetdata[6]);
40 }
41
42 BufferedPacket makePacket(Address &address, u8 *data, u32 datasize,
43                 u32 protocol_id, u16 sender_peer_id, u8 channel)
44 {
45         u32 packet_size = datasize + BASE_HEADER_SIZE;
46         BufferedPacket p(packet_size);
47         p.address = address;
48
49         writeU32(&p.data[0], protocol_id);
50         writeU16(&p.data[4], sender_peer_id);
51         writeU8(&p.data[6], channel);
52
53         memcpy(&p.data[BASE_HEADER_SIZE], data, datasize);
54
55         return p;
56 }
57
58 BufferedPacket makePacket(Address &address, SharedBuffer<u8> &data,
59                 u32 protocol_id, u16 sender_peer_id, u8 channel)
60 {
61         return makePacket(address, *data, data.getSize(),
62                         protocol_id, sender_peer_id, channel);
63 }
64
65 SharedBuffer<u8> makeOriginalPacket(
66                 SharedBuffer<u8> data)
67 {
68         u32 header_size = 1;
69         u32 packet_size = data.getSize() + header_size;
70         SharedBuffer<u8> b(packet_size);
71
72         writeU8(&b[0], TYPE_ORIGINAL);
73
74         memcpy(&b[header_size], *data, data.getSize());
75
76         return b;
77 }
78
79 std::list<SharedBuffer<u8> > makeSplitPacket(
80                 SharedBuffer<u8> data,
81                 u32 chunksize_max,
82                 u16 seqnum)
83 {
84         // Chunk packets, containing the TYPE_SPLIT header
85         std::list<SharedBuffer<u8> > chunks;
86         
87         u32 chunk_header_size = 7;
88         u32 maximum_data_size = chunksize_max - chunk_header_size;
89         u32 start = 0;
90         u32 end = 0;
91         u32 chunk_num = 0;
92         u16 chunk_count = 0;
93         do{
94                 end = start + maximum_data_size - 1;
95                 if(end > data.getSize() - 1)
96                         end = data.getSize() - 1;
97                 
98                 u32 payload_size = end - start + 1;
99                 u32 packet_size = chunk_header_size + payload_size;
100
101                 SharedBuffer<u8> chunk(packet_size);
102                 
103                 writeU8(&chunk[0], TYPE_SPLIT);
104                 writeU16(&chunk[1], seqnum);
105                 // [3] u16 chunk_count is written at next stage
106                 writeU16(&chunk[5], chunk_num);
107                 memcpy(&chunk[chunk_header_size], &data[start], payload_size);
108
109                 chunks.push_back(chunk);
110                 chunk_count++;
111                 
112                 start = end + 1;
113                 chunk_num++;
114         }
115         while(end != data.getSize() - 1);
116
117         for(std::list<SharedBuffer<u8> >::iterator i = chunks.begin();
118                 i != chunks.end(); ++i)
119         {
120                 // Write chunk_count
121                 writeU16(&((*i)[3]), chunk_count);
122         }
123
124         return chunks;
125 }
126
127 std::list<SharedBuffer<u8> > makeAutoSplitPacket(
128                 SharedBuffer<u8> data,
129                 u32 chunksize_max,
130                 u16 &split_seqnum)
131 {
132         u32 original_header_size = 1;
133         std::list<SharedBuffer<u8> > list;
134         if(data.getSize() + original_header_size > chunksize_max)
135         {
136                 list = makeSplitPacket(data, chunksize_max, split_seqnum);
137                 split_seqnum++;
138                 return list;
139         }
140         else
141         {
142                 list.push_back(makeOriginalPacket(data));
143         }
144         return list;
145 }
146
147 SharedBuffer<u8> makeReliablePacket(
148                 SharedBuffer<u8> data,
149                 u16 seqnum)
150 {
151         /*dstream<<"BEGIN SharedBuffer<u8> makeReliablePacket()"<<std::endl;
152         dstream<<"data.getSize()="<<data.getSize()<<", data[0]="
153                         <<((unsigned int)data[0]&0xff)<<std::endl;*/
154         u32 header_size = 3;
155         u32 packet_size = data.getSize() + header_size;
156         SharedBuffer<u8> b(packet_size);
157
158         writeU8(&b[0], TYPE_RELIABLE);
159         writeU16(&b[1], seqnum);
160
161         memcpy(&b[header_size], *data, data.getSize());
162
163         /*dstream<<"data.getSize()="<<data.getSize()<<", data[0]="
164                         <<((unsigned int)data[0]&0xff)<<std::endl;*/
165         //dstream<<"END SharedBuffer<u8> makeReliablePacket()"<<std::endl;
166         return b;
167 }
168
169 /*
170         ReliablePacketBuffer
171 */
172
173 ReliablePacketBuffer::ReliablePacketBuffer(): m_list_size(0) {}
174
175 void ReliablePacketBuffer::print()
176 {
177         for(std::list<BufferedPacket>::iterator i = m_list.begin();
178                 i != m_list.end();
179                 ++i)
180         {
181                 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
182                 dout_con<<s<<" ";
183         }
184 }
185 bool ReliablePacketBuffer::empty()
186 {
187         return m_list.empty();
188 }
189 u32 ReliablePacketBuffer::size()
190 {
191         return m_list_size;
192 }
193 RPBSearchResult ReliablePacketBuffer::findPacket(u16 seqnum)
194 {
195         std::list<BufferedPacket>::iterator i = m_list.begin();
196         for(; i != m_list.end(); ++i)
197         {
198                 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
199                 /*dout_con<<"findPacket(): finding seqnum="<<seqnum
200                                 <<", comparing to s="<<s<<std::endl;*/
201                 if(s == seqnum)
202                         break;
203         }
204         return i;
205 }
206 RPBSearchResult ReliablePacketBuffer::notFound()
207 {
208         return m_list.end();
209 }
210 u16 ReliablePacketBuffer::getFirstSeqnum()
211 {
212         if(empty())
213                 throw NotFoundException("Buffer is empty");
214         BufferedPacket p = *m_list.begin();
215         return readU16(&p.data[BASE_HEADER_SIZE+1]);
216 }
217 BufferedPacket ReliablePacketBuffer::popFirst()
218 {
219         if(empty())
220                 throw NotFoundException("Buffer is empty");
221         BufferedPacket p = *m_list.begin();
222         m_list.erase(m_list.begin());
223         --m_list_size;
224         return p;
225 }
226 BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum)
227 {
228         RPBSearchResult r = findPacket(seqnum);
229         if(r == notFound()){
230                 dout_con<<"Not found"<<std::endl;
231                 throw NotFoundException("seqnum not found in buffer");
232         }
233         BufferedPacket p = *r;
234         m_list.erase(r);
235         --m_list_size;
236         return p;
237 }
238 void ReliablePacketBuffer::insert(BufferedPacket &p)
239 {
240         assert(p.data.getSize() >= BASE_HEADER_SIZE+3);
241         u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
242         assert(type == TYPE_RELIABLE);
243         u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
244
245         ++m_list_size;
246         // Find the right place for the packet and insert it there
247
248         // If list is empty, just add it
249         if(m_list.empty())
250         {
251                 m_list.push_back(p);
252                 // Done.
253                 return;
254         }
255         // Otherwise find the right place
256         std::list<BufferedPacket>::iterator i = m_list.begin();
257         // Find the first packet in the list which has a higher seqnum
258         for(; i != m_list.end(); ++i){
259                 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
260                 if(s == seqnum){
261                         --m_list_size;
262                         throw AlreadyExistsException("Same seqnum in list");
263                 }
264                 if(seqnum_higher(s, seqnum)){
265                         break;
266                 }
267         }
268         // If we're at the end of the list, add the packet to the
269         // end of the list
270         if(i == m_list.end())
271         {
272                 m_list.push_back(p);
273                 // Done.
274                 return;
275         }
276         // Insert before i
277         m_list.insert(i, p);
278 }
279
280 void ReliablePacketBuffer::incrementTimeouts(float dtime)
281 {
282         for(std::list<BufferedPacket>::iterator i = m_list.begin();
283                 i != m_list.end(); ++i)
284         {
285                 i->time += dtime;
286                 i->totaltime += dtime;
287         }
288 }
289
290 void ReliablePacketBuffer::resetTimedOuts(float timeout)
291 {
292         for(std::list<BufferedPacket>::iterator i = m_list.begin();
293                 i != m_list.end(); ++i)
294         {
295                 if(i->time >= timeout)
296                         i->time = 0.0;
297         }
298 }
299
300 bool ReliablePacketBuffer::anyTotaltimeReached(float timeout)
301 {
302         for(std::list<BufferedPacket>::iterator i = m_list.begin();
303                 i != m_list.end(); ++i)
304         {
305                 if(i->totaltime >= timeout)
306                         return true;
307         }
308         return false;
309 }
310
311 std::list<BufferedPacket> ReliablePacketBuffer::getTimedOuts(float timeout)
312 {
313         std::list<BufferedPacket> timed_outs;
314         for(std::list<BufferedPacket>::iterator i = m_list.begin();
315                 i != m_list.end(); ++i)
316         {
317                 if(i->time >= timeout)
318                         timed_outs.push_back(*i);
319         }
320         return timed_outs;
321 }
322
323 /*
324         IncomingSplitBuffer
325 */
326
327 IncomingSplitBuffer::~IncomingSplitBuffer()
328 {
329         for(std::map<u16, IncomingSplitPacket*>::iterator i = m_buf.begin();
330                 i != m_buf.end(); ++i)
331         {
332                 delete i->second;
333         }
334 }
335 /*
336         This will throw a GotSplitPacketException when a full
337         split packet is constructed.
338 */
339 SharedBuffer<u8> IncomingSplitBuffer::insert(BufferedPacket &p, bool reliable)
340 {
341         u32 headersize = BASE_HEADER_SIZE + 7;
342         assert(p.data.getSize() >= headersize);
343         u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
344         assert(type == TYPE_SPLIT);
345         u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
346         u16 chunk_count = readU16(&p.data[BASE_HEADER_SIZE+3]);
347         u16 chunk_num = readU16(&p.data[BASE_HEADER_SIZE+5]);
348
349         // Add if doesn't exist
350         if(m_buf.find(seqnum) == m_buf.end())
351         {
352                 IncomingSplitPacket *sp = new IncomingSplitPacket();
353                 sp->chunk_count = chunk_count;
354                 sp->reliable = reliable;
355                 m_buf[seqnum] = sp;
356         }
357         
358         IncomingSplitPacket *sp = m_buf[seqnum];
359         
360         // TODO: These errors should be thrown or something? Dunno.
361         if(chunk_count != sp->chunk_count)
362                 derr_con<<"Connection: WARNING: chunk_count="<<chunk_count
363                                 <<" != sp->chunk_count="<<sp->chunk_count
364                                 <<std::endl;
365         if(reliable != sp->reliable)
366                 derr_con<<"Connection: WARNING: reliable="<<reliable
367                                 <<" != sp->reliable="<<sp->reliable
368                                 <<std::endl;
369
370         // If chunk already exists, ignore it.
371         // Sometimes two identical packets may arrive when there is network
372         // lag and the server re-sends stuff.
373         if(sp->chunks.find(chunk_num) != sp->chunks.end())
374                 return SharedBuffer<u8>();
375         
376         // Cut chunk data out of packet
377         u32 chunkdatasize = p.data.getSize() - headersize;
378         SharedBuffer<u8> chunkdata(chunkdatasize);
379         memcpy(*chunkdata, &(p.data[headersize]), chunkdatasize);
380         
381         // Set chunk data in buffer
382         sp->chunks[chunk_num] = chunkdata;
383         
384         // If not all chunks are received, return empty buffer
385         if(sp->allReceived() == false)
386                 return SharedBuffer<u8>();
387
388         // Calculate total size
389         u32 totalsize = 0;
390         for(std::map<u16, SharedBuffer<u8> >::iterator i = sp->chunks.begin();
391                 i != sp->chunks.end(); ++i)
392         {
393                 totalsize += i->second.getSize();
394         }
395         
396         SharedBuffer<u8> fulldata(totalsize);
397
398         // Copy chunks to data buffer
399         u32 start = 0;
400         for(u32 chunk_i=0; chunk_i<sp->chunk_count;
401                         chunk_i++)
402         {
403                 SharedBuffer<u8> buf = sp->chunks[chunk_i];
404                 u16 chunkdatasize = buf.getSize();
405                 memcpy(&fulldata[start], *buf, chunkdatasize);
406                 start += chunkdatasize;;
407         }
408
409         // Remove sp from buffer
410         m_buf.erase(seqnum);
411         delete sp;
412
413         return fulldata;
414 }
415 void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout)
416 {
417         std::list<u16> remove_queue;
418         for(std::map<u16, IncomingSplitPacket*>::iterator i = m_buf.begin();
419                 i != m_buf.end(); ++i)
420         {
421                 IncomingSplitPacket *p = i->second;
422                 // Reliable ones are not removed by timeout
423                 if(p->reliable == true)
424                         continue;
425                 p->time += dtime;
426                 if(p->time >= timeout)
427                         remove_queue.push_back(i->first);
428         }
429         for(std::list<u16>::iterator j = remove_queue.begin();
430                 j != remove_queue.end(); ++j)
431         {
432                 dout_con<<"NOTE: Removing timed out unreliable split packet"
433                                 <<std::endl;
434                 delete m_buf[*j];
435                 m_buf.erase(*j);
436         }
437 }
438
439 /*
440         Channel
441 */
442
443 Channel::Channel()
444 {
445         next_outgoing_seqnum = SEQNUM_INITIAL;
446         next_incoming_seqnum = SEQNUM_INITIAL;
447         next_outgoing_split_seqnum = SEQNUM_INITIAL;
448 }
449 Channel::~Channel()
450 {
451 }
452
453 /*
454         Peer
455 */
456
457 Peer::Peer(u16 a_id, Address a_address):
458         address(a_address),
459         id(a_id),
460         timeout_counter(0.0),
461         ping_timer(0.0),
462         resend_timeout(0.5),
463         avg_rtt(-1.0),
464         has_sent_with_id(false),
465         m_sendtime_accu(0),
466         m_max_packets_per_second(10),
467         m_num_sent(0),
468         m_max_num_sent(0),
469         congestion_control_aim_rtt(0.2),
470         congestion_control_max_rate(400),
471         congestion_control_min_rate(10)
472 {
473 }
474 Peer::~Peer()
475 {
476 }
477
478 void Peer::reportRTT(float rtt)
479 {
480         if(rtt >= 0.0){
481                 if(rtt < 0.01){
482                         if(m_max_packets_per_second < congestion_control_max_rate)
483                                 m_max_packets_per_second += 10;
484                 } else if(rtt < congestion_control_aim_rtt){
485                         if(m_max_packets_per_second < congestion_control_max_rate)
486                                 m_max_packets_per_second += 2;
487                 } else {
488                         m_max_packets_per_second *= 0.8;
489                         if(m_max_packets_per_second < congestion_control_min_rate)
490                                 m_max_packets_per_second = congestion_control_min_rate;
491                 }
492         }
493
494         if(rtt < -0.999)
495         {}
496         else if(avg_rtt < 0.0)
497                 avg_rtt = rtt;
498         else
499                 avg_rtt = rtt * 0.1 + avg_rtt * 0.9;
500         
501         // Calculate resend_timeout
502
503         /*int reliable_count = 0;
504         for(int i=0; i<CHANNEL_COUNT; i++)
505         {
506                 reliable_count += channels[i].outgoing_reliables.size();
507         }
508         float timeout = avg_rtt * RESEND_TIMEOUT_FACTOR
509                         * ((float)reliable_count * 1);*/
510         
511         float timeout = avg_rtt * RESEND_TIMEOUT_FACTOR;
512         if(timeout < RESEND_TIMEOUT_MIN)
513                 timeout = RESEND_TIMEOUT_MIN;
514         if(timeout > RESEND_TIMEOUT_MAX)
515                 timeout = RESEND_TIMEOUT_MAX;
516         resend_timeout = timeout;
517 }
518                                 
519 /*
520         Connection
521 */
522
523 Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout):
524         m_protocol_id(protocol_id),
525         m_max_packet_size(max_packet_size),
526         m_timeout(timeout),
527         m_peer_id(0),
528         m_bc_peerhandler(NULL),
529         m_bc_receive_timeout(0),
530         m_indentation(0)
531 {
532         m_socket.setTimeoutMs(5);
533
534         Start();
535 }
536
537 Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout,
538                 PeerHandler *peerhandler):
539         m_protocol_id(protocol_id),
540         m_max_packet_size(max_packet_size),
541         m_timeout(timeout),
542         m_peer_id(0),
543         m_bc_peerhandler(peerhandler),
544         m_bc_receive_timeout(0),
545         m_indentation(0)
546 {
547         m_socket.setTimeoutMs(5);
548
549         Start();
550 }
551
552
553 Connection::~Connection()
554 {
555         stop();
556         // Delete peers
557         for(std::map<u16, Peer*>::iterator
558                         j = m_peers.begin();
559                         j != m_peers.end(); ++j)
560         {
561                 delete j->second;
562         }
563 }
564
565 /* Internal stuff */
566
567 void * Connection::Thread()
568 {
569         ThreadStarted();
570         log_register_thread("Connection");
571
572         dout_con<<"Connection thread started"<<std::endl;
573         
574         u32 curtime = porting::getTimeMs();
575         u32 lasttime = curtime;
576
577         while(getRun())
578         {
579                 BEGIN_DEBUG_EXCEPTION_HANDLER
580                 
581                 lasttime = curtime;
582                 curtime = porting::getTimeMs();
583                 float dtime = (float)(curtime - lasttime) / 1000.;
584                 if(dtime > 0.1)
585                         dtime = 0.1;
586                 if(dtime < 0.0)
587                         dtime = 0.0;
588                 
589                 runTimeouts(dtime);
590
591                 while(!m_command_queue.empty()){
592                         ConnectionCommand c = m_command_queue.pop_front();
593                         processCommand(c);
594                 }
595
596                 send(dtime);
597
598                 receive();
599                 
600                 END_DEBUG_EXCEPTION_HANDLER(derr_con);
601         }
602
603         return NULL;
604 }
605
606 void Connection::putEvent(ConnectionEvent &e)
607 {
608         assert(e.type != CONNEVENT_NONE);
609         m_event_queue.push_back(e);
610 }
611
612 void Connection::processCommand(ConnectionCommand &c)
613 {
614         switch(c.type){
615         case CONNCMD_NONE:
616                 dout_con<<getDesc()<<" processing CONNCMD_NONE"<<std::endl;
617                 return;
618         case CONNCMD_SERVE:
619                 dout_con<<getDesc()<<" processing CONNCMD_SERVE port="
620                                 <<c.port<<std::endl;
621                 serve(c.port);
622                 return;
623         case CONNCMD_CONNECT:
624                 dout_con<<getDesc()<<" processing CONNCMD_CONNECT"<<std::endl;
625                 connect(c.address);
626                 return;
627         case CONNCMD_DISCONNECT:
628                 dout_con<<getDesc()<<" processing CONNCMD_DISCONNECT"<<std::endl;
629                 disconnect();
630                 return;
631         case CONNCMD_SEND:
632                 dout_con<<getDesc()<<" processing CONNCMD_SEND"<<std::endl;
633                 send(c.peer_id, c.channelnum, c.data, c.reliable);
634                 return;
635         case CONNCMD_SEND_TO_ALL:
636                 dout_con<<getDesc()<<" processing CONNCMD_SEND_TO_ALL"<<std::endl;
637                 sendToAll(c.channelnum, c.data, c.reliable);
638                 return;
639         case CONNCMD_DELETE_PEER:
640                 dout_con<<getDesc()<<" processing CONNCMD_DELETE_PEER"<<std::endl;
641                 deletePeer(c.peer_id, false);
642                 return;
643         }
644 }
645
646 void Connection::send(float dtime)
647 {
648         for(std::map<u16, Peer*>::iterator
649                         j = m_peers.begin();
650                         j != m_peers.end(); ++j)
651         {
652                 Peer *peer = j->second;
653                 peer->m_sendtime_accu += dtime;
654                 peer->m_num_sent = 0;
655                 peer->m_max_num_sent = peer->m_sendtime_accu *
656                                 peer->m_max_packets_per_second;
657         }
658         Queue<OutgoingPacket> postponed_packets;
659         while(!m_outgoing_queue.empty()){
660                 OutgoingPacket packet = m_outgoing_queue.pop_front();
661                 Peer *peer = getPeerNoEx(packet.peer_id);
662                 if(!peer)
663                         continue;
664                 if(peer->channels[packet.channelnum].outgoing_reliables.size() >= 5){
665                         postponed_packets.push_back(packet);
666                 } else if(peer->m_num_sent < peer->m_max_num_sent){
667                         rawSendAsPacket(packet.peer_id, packet.channelnum,
668                                         packet.data, packet.reliable);
669                         peer->m_num_sent++;
670                 } else {
671                         postponed_packets.push_back(packet);
672                 }
673         }
674         while(!postponed_packets.empty()){
675                 m_outgoing_queue.push_back(postponed_packets.pop_front());
676         }
677         for(std::map<u16, Peer*>::iterator
678                         j = m_peers.begin();
679                         j != m_peers.end(); ++j)
680         {
681                 Peer *peer = j->second;
682                 peer->m_sendtime_accu -= (float)peer->m_num_sent /
683                                 peer->m_max_packets_per_second;
684                 if(peer->m_sendtime_accu > 10. / peer->m_max_packets_per_second)
685                         peer->m_sendtime_accu = 10. / peer->m_max_packets_per_second;
686         }
687 }
688
689 // Receive packets from the network and buffers and create ConnectionEvents
690 void Connection::receive()
691 {
692         u32 datasize = m_max_packet_size * 2;  // Double it just to be safe
693         // TODO: We can not know how many layers of header there are.
694         // For now, just assume there are no other than the base headers.
695         u32 packet_maxsize = datasize + BASE_HEADER_SIZE;
696         SharedBuffer<u8> packetdata(packet_maxsize);
697
698         bool single_wait_done = false;
699         
700         for(;;)
701         {
702         try{
703                 /* Check if some buffer has relevant data */
704                 {
705                         u16 peer_id;
706                         SharedBuffer<u8> resultdata;
707                         bool got = getFromBuffers(peer_id, resultdata);
708                         if(got){
709                                 ConnectionEvent e;
710                                 e.dataReceived(peer_id, resultdata);
711                                 putEvent(e);
712                                 continue;
713                         }
714                 }
715                 
716                 if(single_wait_done){
717                         if(m_socket.WaitData(0) == false)
718                                 break;
719                 }
720                 
721                 single_wait_done = true;
722
723                 Address sender;
724                 s32 received_size = m_socket.Receive(sender, *packetdata, packet_maxsize);
725
726                 if(received_size < 0)
727                         break;
728                 if(received_size < BASE_HEADER_SIZE)
729                         continue;
730                 if(readU32(&packetdata[0]) != m_protocol_id)
731                         continue;
732                 
733                 u16 peer_id = readPeerId(*packetdata);
734                 u8 channelnum = readChannel(*packetdata);
735                 if(channelnum > CHANNEL_COUNT-1){
736                         PrintInfo(derr_con);
737                         derr_con<<"Receive(): Invalid channel "<<channelnum<<std::endl;
738                         throw InvalidIncomingDataException("Channel doesn't exist");
739                 }
740
741                 if(peer_id == PEER_ID_INEXISTENT)
742                 {
743                         /*
744                                 Somebody is trying to send stuff to us with no peer id.
745                                 
746                                 Check if the same address and port was added to our peer
747                                 list before.
748                                 Allow only entries that have has_sent_with_id==false.
749                         */
750
751                         std::map<u16, Peer*>::iterator j;
752                         j = m_peers.begin();
753                         for(; j != m_peers.end(); ++j)
754                         {
755                                 Peer *peer = j->second;
756                                 if(peer->has_sent_with_id)
757                                         continue;
758                                 if(peer->address == sender)
759                                         break;
760                         }
761                         
762                         /*
763                                 If no peer was found with the same address and port,
764                                 we shall assume it is a new peer and create an entry.
765                         */
766                         if(j == m_peers.end())
767                         {
768                                 // Pass on to adding the peer
769                         }
770                         // Else: A peer was found.
771                         else
772                         {
773                                 Peer *peer = j->second;
774                                 peer_id = peer->id;
775                                 PrintInfo(derr_con);
776                                 derr_con<<"WARNING: Assuming unknown peer to be "
777                                                 <<"peer_id="<<peer_id<<std::endl;
778                         }
779                 }
780                 
781                 /*
782                         The peer was not found in our lists. Add it.
783                 */
784                 if(peer_id == PEER_ID_INEXISTENT)
785                 {
786                         // Somebody wants to make a new connection
787
788                         // Get a unique peer id (2 or higher)
789                         u16 peer_id_new = 2;
790                         /*
791                                 Find an unused peer id
792                         */
793                         bool out_of_ids = false;
794                         for(;;)
795                         {
796                                 // Check if exists
797                                 if(m_peers.find(peer_id_new) == m_peers.end())
798                                         break;
799                                 // Check for overflow
800                                 if(peer_id_new == 65535){
801                                         out_of_ids = true;
802                                         break;
803                                 }
804                                 peer_id_new++;
805                         }
806                         if(out_of_ids){
807                                 errorstream<<getDesc()<<" ran out of peer ids"<<std::endl;
808                                 continue;
809                         }
810
811                         PrintInfo();
812                         dout_con<<"Receive(): Got a packet with peer_id=PEER_ID_INEXISTENT,"
813                                         " giving peer_id="<<peer_id_new<<std::endl;
814
815                         // Create a peer
816                         Peer *peer = new Peer(peer_id_new, sender);
817                         m_peers[peer->id] = peer;
818                         
819                         // Create peer addition event
820                         ConnectionEvent e;
821                         e.peerAdded(peer_id_new, sender);
822                         putEvent(e);
823                         
824                         // Create CONTROL packet to tell the peer id to the new peer.
825                         SharedBuffer<u8> reply(4);
826                         writeU8(&reply[0], TYPE_CONTROL);
827                         writeU8(&reply[1], CONTROLTYPE_SET_PEER_ID);
828                         writeU16(&reply[2], peer_id_new);
829                         sendAsPacket(peer_id_new, 0, reply, true);
830                         
831                         // We're now talking to a valid peer_id
832                         peer_id = peer_id_new;
833
834                         // Go on and process whatever it sent
835                 }
836
837                 std::map<u16, Peer*>::iterator node = m_peers.find(peer_id);
838
839                 if(node == m_peers.end())
840                 {
841                         // Peer not found
842                         // This means that the peer id of the sender is not PEER_ID_INEXISTENT
843                         // and it is invalid.
844                         PrintInfo(derr_con);
845                         derr_con<<"Receive(): Peer not found"<<std::endl;
846                         throw InvalidIncomingDataException("Peer not found (possible timeout)");
847                 }
848
849                 Peer *peer = node->second;
850
851                 // Validate peer address
852                 if(peer->address != sender)
853                 {
854                         PrintInfo(derr_con);
855                         derr_con<<"Peer "<<peer_id<<" sending from different address."
856                                         " Ignoring."<<std::endl;
857                         continue;
858                 }
859                 
860                 peer->timeout_counter = 0.0;
861
862                 Channel *channel = &(peer->channels[channelnum]);
863                 
864                 // Throw the received packet to channel->processPacket()
865
866                 // Make a new SharedBuffer from the data without the base headers
867                 SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE);
868                 memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE],
869                                 strippeddata.getSize());
870                 
871                 try{
872                         // Process it (the result is some data with no headers made by us)
873                         SharedBuffer<u8> resultdata = processPacket
874                                         (channel, strippeddata, peer_id, channelnum, false);
875                         
876                         PrintInfo();
877                         dout_con<<"ProcessPacket returned data of size "
878                                         <<resultdata.getSize()<<std::endl;
879                         
880                         ConnectionEvent e;
881                         e.dataReceived(peer_id, resultdata);
882                         putEvent(e);
883                         continue;
884                 }catch(ProcessedSilentlyException &e){
885                 }
886         }catch(InvalidIncomingDataException &e){
887         }
888         catch(ProcessedSilentlyException &e){
889         }
890         } // for
891 }
892
893 void Connection::runTimeouts(float dtime)
894 {
895         float congestion_control_aim_rtt
896                         = g_settings->getFloat("congestion_control_aim_rtt");
897         float congestion_control_max_rate
898                         = g_settings->getFloat("congestion_control_max_rate");
899         float congestion_control_min_rate
900                         = g_settings->getFloat("congestion_control_min_rate");
901
902         std::list<u16> timeouted_peers;
903         for(std::map<u16, Peer*>::iterator j = m_peers.begin();
904                 j != m_peers.end(); ++j)
905         {
906                 Peer *peer = j->second;
907
908                 // Update congestion control values
909                 peer->congestion_control_aim_rtt = congestion_control_aim_rtt;
910                 peer->congestion_control_max_rate = congestion_control_max_rate;
911                 peer->congestion_control_min_rate = congestion_control_min_rate;
912                 
913                 /*
914                         Check peer timeout
915                 */
916                 peer->timeout_counter += dtime;
917                 if(peer->timeout_counter > m_timeout)
918                 {
919                         PrintInfo(derr_con);
920                         derr_con<<"RunTimeouts(): Peer "<<peer->id
921                                         <<" has timed out."
922                                         <<" (source=peer->timeout_counter)"
923                                         <<std::endl;
924                         // Add peer to the list
925                         timeouted_peers.push_back(peer->id);
926                         // Don't bother going through the buffers of this one
927                         continue;
928                 }
929
930                 float resend_timeout = peer->resend_timeout;
931                 for(u16 i=0; i<CHANNEL_COUNT; i++)
932                 {
933                         std::list<BufferedPacket> timed_outs;
934                         
935                         Channel *channel = &peer->channels[i];
936
937                         // Remove timed out incomplete unreliable split packets
938                         channel->incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout);
939                         
940                         // Increment reliable packet times
941                         channel->outgoing_reliables.incrementTimeouts(dtime);
942
943                         // Check reliable packet total times, remove peer if
944                         // over timeout.
945                         if(channel->outgoing_reliables.anyTotaltimeReached(m_timeout))
946                         {
947                                 PrintInfo(derr_con);
948                                 derr_con<<"RunTimeouts(): Peer "<<peer->id
949                                                 <<" has timed out."
950                                                 <<" (source=reliable packet totaltime)"
951                                                 <<std::endl;
952                                 // Add peer to the to-be-removed list
953                                 timeouted_peers.push_back(peer->id);
954                                 goto nextpeer;
955                         }
956
957                         // Re-send timed out outgoing reliables
958                         
959                         timed_outs = channel->
960                                         outgoing_reliables.getTimedOuts(resend_timeout);
961
962                         channel->outgoing_reliables.resetTimedOuts(resend_timeout);
963
964                         for(std::list<BufferedPacket>::iterator j = timed_outs.begin();
965                                 j != timed_outs.end(); ++j)
966                         {
967                                 u16 peer_id = readPeerId(*(j->data));
968                                 u8 channel = readChannel(*(j->data));
969                                 u16 seqnum = readU16(&(j->data[BASE_HEADER_SIZE+1]));
970
971                                 PrintInfo(derr_con);
972                                 derr_con<<"RE-SENDING timed-out RELIABLE to ";
973                                 j->address.print(&derr_con);
974                                 derr_con<<"(t/o="<<resend_timeout<<"): "
975                                                 <<"from_peer_id="<<peer_id
976                                                 <<", channel="<<((int)channel&0xff)
977                                                 <<", seqnum="<<seqnum
978                                                 <<std::endl;
979
980                                 rawSend(*j);
981
982                                 // Enlarge avg_rtt and resend_timeout:
983                                 // The rtt will be at least the timeout.
984                                 // NOTE: This won't affect the timeout of the next
985                                 // checked channel because it was cached.
986                                 peer->reportRTT(resend_timeout);
987                         }
988                 }
989                 
990                 /*
991                         Send pings
992                 */
993                 peer->ping_timer += dtime;
994                 if(peer->ping_timer >= 5.0)
995                 {
996                         // Create and send PING packet
997                         SharedBuffer<u8> data(2);
998                         writeU8(&data[0], TYPE_CONTROL);
999                         writeU8(&data[1], CONTROLTYPE_PING);
1000                         rawSendAsPacket(peer->id, 0, data, true);
1001
1002                         peer->ping_timer = 0.0;
1003                 }
1004                 
1005 nextpeer:
1006                 continue;
1007         }
1008
1009         // Remove timed out peers
1010         for(std::list<u16>::iterator i = timeouted_peers.begin();
1011                 i != timeouted_peers.end(); ++i)
1012         {
1013                 PrintInfo(derr_con);
1014                 derr_con<<"RunTimeouts(): Removing peer "<<(*i)<<std::endl;
1015                 deletePeer(*i, true);
1016         }
1017 }
1018
1019 void Connection::serve(u16 port)
1020 {
1021         dout_con<<getDesc()<<" serving at port "<<port<<std::endl;
1022         try{
1023                 m_socket.Bind(port);
1024                 m_peer_id = PEER_ID_SERVER;
1025         }
1026         catch(SocketException &e){
1027                 // Create event
1028                 ConnectionEvent ce;
1029                 ce.bindFailed();
1030                 putEvent(ce);
1031         }
1032 }
1033
1034 void Connection::connect(Address address)
1035 {
1036         dout_con<<getDesc()<<" connecting to "<<address.serializeString()
1037                         <<":"<<address.getPort()<<std::endl;
1038
1039         std::map<u16, Peer*>::iterator node = m_peers.find(PEER_ID_SERVER);
1040         if(node != m_peers.end()){
1041                 throw ConnectionException("Already connected to a server");
1042         }
1043
1044         Peer *peer = new Peer(PEER_ID_SERVER, address);
1045         m_peers[peer->id] = peer;
1046
1047         // Create event
1048         ConnectionEvent e;
1049         e.peerAdded(peer->id, peer->address);
1050         putEvent(e);
1051         
1052         m_socket.Bind(0);
1053         
1054         // Send a dummy packet to server with peer_id = PEER_ID_INEXISTENT
1055         m_peer_id = PEER_ID_INEXISTENT;
1056         SharedBuffer<u8> data(0);
1057         Send(PEER_ID_SERVER, 0, data, true);
1058 }
1059
1060 void Connection::disconnect()
1061 {
1062         dout_con<<getDesc()<<" disconnecting"<<std::endl;
1063
1064         // Create and send DISCO packet
1065         SharedBuffer<u8> data(2);
1066         writeU8(&data[0], TYPE_CONTROL);
1067         writeU8(&data[1], CONTROLTYPE_DISCO);
1068         
1069         // Send to all
1070         for(std::map<u16, Peer*>::iterator j = m_peers.begin();
1071                 j != m_peers.end(); ++j)
1072         {
1073                 Peer *peer = j->second;
1074                 rawSendAsPacket(peer->id, 0, data, false);
1075         }
1076 }
1077
1078 void Connection::sendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable)
1079 {
1080         for(std::map<u16, Peer*>::iterator j = m_peers.begin();
1081                 j != m_peers.end(); ++j)
1082         {
1083                 Peer *peer = j->second;
1084                 send(peer->id, channelnum, data, reliable);
1085         }
1086 }
1087
1088 void Connection::send(u16 peer_id, u8 channelnum,
1089                 SharedBuffer<u8> data, bool reliable)
1090 {
1091         dout_con<<getDesc()<<" sending to peer_id="<<peer_id<<std::endl;
1092
1093         assert(channelnum < CHANNEL_COUNT);
1094         
1095         Peer *peer = getPeerNoEx(peer_id);
1096         if(peer == NULL)
1097                 return;
1098         Channel *channel = &(peer->channels[channelnum]);
1099
1100         u32 chunksize_max = m_max_packet_size - BASE_HEADER_SIZE;
1101         if(reliable)
1102                 chunksize_max -= RELIABLE_HEADER_SIZE;
1103
1104         std::list<SharedBuffer<u8> > originals;
1105         originals = makeAutoSplitPacket(data, chunksize_max,
1106                         channel->next_outgoing_split_seqnum);
1107         
1108         for(std::list<SharedBuffer<u8> >::iterator i = originals.begin();
1109                 i != originals.end(); ++i)
1110         {
1111                 SharedBuffer<u8> original = *i;
1112                 
1113                 sendAsPacket(peer_id, channelnum, original, reliable);
1114         }
1115 }
1116
1117 void Connection::sendAsPacket(u16 peer_id, u8 channelnum,
1118                 SharedBuffer<u8> data, bool reliable)
1119 {
1120         OutgoingPacket packet(peer_id, channelnum, data, reliable);
1121         m_outgoing_queue.push_back(packet);
1122 }
1123
1124 void Connection::rawSendAsPacket(u16 peer_id, u8 channelnum,
1125                 SharedBuffer<u8> data, bool reliable)
1126 {
1127         Peer *peer = getPeerNoEx(peer_id);
1128         if(!peer)
1129                 return;
1130         Channel *channel = &(peer->channels[channelnum]);
1131
1132         if(reliable)
1133         {
1134                 u16 seqnum = channel->next_outgoing_seqnum;
1135                 channel->next_outgoing_seqnum++;
1136
1137                 SharedBuffer<u8> reliable = makeReliablePacket(data, seqnum);
1138
1139                 // Add base headers and make a packet
1140                 BufferedPacket p = makePacket(peer->address, reliable,
1141                                 m_protocol_id, m_peer_id, channelnum);
1142                 
1143                 try{
1144                         // Buffer the packet
1145                         channel->outgoing_reliables.insert(p);
1146                 }
1147                 catch(AlreadyExistsException &e)
1148                 {
1149                         PrintInfo(derr_con);
1150                         derr_con<<"WARNING: Going to send a reliable packet "
1151                                         "seqnum="<<seqnum<<" that is already "
1152                                         "in outgoing buffer"<<std::endl;
1153                         //assert(0);
1154                 }
1155                 
1156                 // Send the packet
1157                 rawSend(p);
1158         }
1159         else
1160         {
1161                 // Add base headers and make a packet
1162                 BufferedPacket p = makePacket(peer->address, data,
1163                                 m_protocol_id, m_peer_id, channelnum);
1164
1165                 // Send the packet
1166                 rawSend(p);
1167         }
1168 }
1169
1170 void Connection::rawSend(const BufferedPacket &packet)
1171 {
1172         try{
1173                 m_socket.Send(packet.address, *packet.data, packet.data.getSize());
1174         } catch(SendFailedException &e){
1175                 derr_con<<"Connection::rawSend(): SendFailedException: "
1176                                 <<packet.address.serializeString()<<std::endl;
1177         }
1178 }
1179
1180 Peer* Connection::getPeer(u16 peer_id)
1181 {
1182         std::map<u16, Peer*>::iterator node = m_peers.find(peer_id);
1183
1184         if(node == m_peers.end()){
1185                 throw PeerNotFoundException("GetPeer: Peer not found (possible timeout)");
1186         }
1187
1188         // Error checking
1189         assert(node->second->id == peer_id);
1190
1191         return node->second;
1192 }
1193
1194 Peer* Connection::getPeerNoEx(u16 peer_id)
1195 {
1196         std::map<u16, Peer*>::iterator node = m_peers.find(peer_id);
1197
1198         if(node == m_peers.end()){
1199                 return NULL;
1200         }
1201
1202         // Error checking
1203         assert(node->second->id == peer_id);
1204
1205         return node->second;
1206 }
1207
1208 std::list<Peer*> Connection::getPeers()
1209 {
1210         std::list<Peer*> list;
1211         for(std::map<u16, Peer*>::iterator j = m_peers.begin();
1212                 j != m_peers.end(); ++j)
1213         {
1214                 Peer *peer = j->second;
1215                 list.push_back(peer);
1216         }
1217         return list;
1218 }
1219
1220 bool Connection::getFromBuffers(u16 &peer_id, SharedBuffer<u8> &dst)
1221 {
1222         for(std::map<u16, Peer*>::iterator j = m_peers.begin();
1223                 j != m_peers.end(); ++j)
1224         {
1225                 Peer *peer = j->second;
1226                 for(u16 i=0; i<CHANNEL_COUNT; i++)
1227                 {
1228                         Channel *channel = &peer->channels[i];
1229                         SharedBuffer<u8> resultdata;
1230                         bool got = checkIncomingBuffers(channel, peer_id, resultdata);
1231                         if(got){
1232                                 dst = resultdata;
1233                                 return true;
1234                         }
1235                 }
1236         }
1237         return false;
1238 }
1239
1240 bool Connection::checkIncomingBuffers(Channel *channel, u16 &peer_id,
1241                 SharedBuffer<u8> &dst)
1242 {
1243         u16 firstseqnum = 0;
1244         // Clear old packets from start of buffer
1245         try{
1246         for(;;){
1247                 firstseqnum = channel->incoming_reliables.getFirstSeqnum();
1248                 if(seqnum_higher(channel->next_incoming_seqnum, firstseqnum))
1249                         channel->incoming_reliables.popFirst();
1250                 else
1251                         break;
1252         }
1253         // This happens if all packets are old
1254         }catch(con::NotFoundException)
1255         {}
1256         
1257         if(channel->incoming_reliables.empty() == false)
1258         {
1259                 if(firstseqnum == channel->next_incoming_seqnum)
1260                 {
1261                         BufferedPacket p = channel->incoming_reliables.popFirst();
1262                         
1263                         peer_id = readPeerId(*p.data);
1264                         u8 channelnum = readChannel(*p.data);
1265                         u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
1266
1267                         PrintInfo();
1268                         dout_con<<"UNBUFFERING TYPE_RELIABLE"
1269                                         <<" seqnum="<<seqnum
1270                                         <<" peer_id="<<peer_id
1271                                         <<" channel="<<((int)channelnum&0xff)
1272                                         <<std::endl;
1273
1274                         channel->next_incoming_seqnum++;
1275                         
1276                         u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
1277                         // Get out the inside packet and re-process it
1278                         SharedBuffer<u8> payload(p.data.getSize() - headers_size);
1279                         memcpy(*payload, &p.data[headers_size], payload.getSize());
1280
1281                         dst = processPacket(channel, payload, peer_id, channelnum, true);
1282                         return true;
1283                 }
1284         }
1285         return false;
1286 }
1287
1288 SharedBuffer<u8> Connection::processPacket(Channel *channel,
1289                 SharedBuffer<u8> packetdata, u16 peer_id,
1290                 u8 channelnum, bool reliable)
1291 {
1292         IndentationRaiser iraiser(&(m_indentation));
1293
1294         if(packetdata.getSize() < 1)
1295                 throw InvalidIncomingDataException("packetdata.getSize() < 1");
1296
1297         u8 type = readU8(&packetdata[0]);
1298         
1299         if(type == TYPE_CONTROL)
1300         {
1301                 if(packetdata.getSize() < 2)
1302                         throw InvalidIncomingDataException("packetdata.getSize() < 2");
1303
1304                 u8 controltype = readU8(&packetdata[1]);
1305
1306                 if(controltype == CONTROLTYPE_ACK)
1307                 {
1308                         if(packetdata.getSize() < 4)
1309                                 throw InvalidIncomingDataException
1310                                                 ("packetdata.getSize() < 4 (ACK header size)");
1311
1312                         u16 seqnum = readU16(&packetdata[2]);
1313                         PrintInfo();
1314                         dout_con<<"Got CONTROLTYPE_ACK: channelnum="
1315                                         <<((int)channelnum&0xff)<<", peer_id="<<peer_id
1316                                         <<", seqnum="<<seqnum<<std::endl;
1317
1318                         try{
1319                                 BufferedPacket p = channel->outgoing_reliables.popSeqnum(seqnum);
1320                                 // Get round trip time
1321                                 float rtt = p.totaltime;
1322
1323                                 // Let peer calculate stuff according to it
1324                                 // (avg_rtt and resend_timeout)
1325                                 Peer *peer = getPeer(peer_id);
1326                                 peer->reportRTT(rtt);
1327
1328                                 //PrintInfo(dout_con);
1329                                 //dout_con<<"RTT = "<<rtt<<std::endl;
1330
1331                                 /*dout_con<<"OUTGOING: ";
1332                                 PrintInfo();
1333                                 channel->outgoing_reliables.print();
1334                                 dout_con<<std::endl;*/
1335                         }
1336                         catch(NotFoundException &e){
1337                                 PrintInfo(derr_con);
1338                                 derr_con<<"WARNING: ACKed packet not "
1339                                                 "in outgoing queue"
1340                                                 <<std::endl;
1341                         }
1342
1343                         throw ProcessedSilentlyException("Got an ACK");
1344                 }
1345                 else if(controltype == CONTROLTYPE_SET_PEER_ID)
1346                 {
1347                         if(packetdata.getSize() < 4)
1348                                 throw InvalidIncomingDataException
1349                                                 ("packetdata.getSize() < 4 (SET_PEER_ID header size)");
1350                         u16 peer_id_new = readU16(&packetdata[2]);
1351                         PrintInfo();
1352                         dout_con<<"Got new peer id: "<<peer_id_new<<"... "<<std::endl;
1353
1354                         if(GetPeerID() != PEER_ID_INEXISTENT)
1355                         {
1356                                 PrintInfo(derr_con);
1357                                 derr_con<<"WARNING: Not changing"
1358                                                 " existing peer id."<<std::endl;
1359                         }
1360                         else
1361                         {
1362                                 dout_con<<"changing."<<std::endl;
1363                                 SetPeerID(peer_id_new);
1364                         }
1365                         throw ProcessedSilentlyException("Got a SET_PEER_ID");
1366                 }
1367                 else if(controltype == CONTROLTYPE_PING)
1368                 {
1369                         // Just ignore it, the incoming data already reset
1370                         // the timeout counter
1371                         PrintInfo();
1372                         dout_con<<"PING"<<std::endl;
1373                         throw ProcessedSilentlyException("Got a PING");
1374                 }
1375                 else if(controltype == CONTROLTYPE_DISCO)
1376                 {
1377                         // Just ignore it, the incoming data already reset
1378                         // the timeout counter
1379                         PrintInfo();
1380                         dout_con<<"DISCO: Removing peer "<<(peer_id)<<std::endl;
1381                         
1382                         if(deletePeer(peer_id, false) == false)
1383                         {
1384                                 PrintInfo(derr_con);
1385                                 derr_con<<"DISCO: Peer not found"<<std::endl;
1386                         }
1387
1388                         throw ProcessedSilentlyException("Got a DISCO");
1389                 }
1390                 else{
1391                         PrintInfo(derr_con);
1392                         derr_con<<"INVALID TYPE_CONTROL: invalid controltype="
1393                                         <<((int)controltype&0xff)<<std::endl;
1394                         throw InvalidIncomingDataException("Invalid control type");
1395                 }
1396         }
1397         else if(type == TYPE_ORIGINAL)
1398         {
1399                 if(packetdata.getSize() < ORIGINAL_HEADER_SIZE)
1400                         throw InvalidIncomingDataException
1401                                         ("packetdata.getSize() < ORIGINAL_HEADER_SIZE");
1402                 PrintInfo();
1403                 dout_con<<"RETURNING TYPE_ORIGINAL to user"
1404                                 <<std::endl;
1405                 // Get the inside packet out and return it
1406                 SharedBuffer<u8> payload(packetdata.getSize() - ORIGINAL_HEADER_SIZE);
1407                 memcpy(*payload, &packetdata[ORIGINAL_HEADER_SIZE], payload.getSize());
1408                 return payload;
1409         }
1410         else if(type == TYPE_SPLIT)
1411         {
1412                 // We have to create a packet again for buffering
1413                 // This isn't actually too bad an idea.
1414                 BufferedPacket packet = makePacket(
1415                                 getPeer(peer_id)->address,
1416                                 packetdata,
1417                                 GetProtocolID(),
1418                                 peer_id,
1419                                 channelnum);
1420                 // Buffer the packet
1421                 SharedBuffer<u8> data = channel->incoming_splits.insert(packet, reliable);
1422                 if(data.getSize() != 0)
1423                 {
1424                         PrintInfo();
1425                         dout_con<<"RETURNING TYPE_SPLIT: Constructed full data, "
1426                                         <<"size="<<data.getSize()<<std::endl;
1427                         return data;
1428                 }
1429                 PrintInfo();
1430                 dout_con<<"BUFFERED TYPE_SPLIT"<<std::endl;
1431                 throw ProcessedSilentlyException("Buffered a split packet chunk");
1432         }
1433         else if(type == TYPE_RELIABLE)
1434         {
1435                 // Recursive reliable packets not allowed
1436                 assert(reliable == false);
1437
1438                 if(packetdata.getSize() < RELIABLE_HEADER_SIZE)
1439                         throw InvalidIncomingDataException
1440                                         ("packetdata.getSize() < RELIABLE_HEADER_SIZE");
1441
1442                 u16 seqnum = readU16(&packetdata[1]);
1443
1444                 bool is_future_packet = seqnum_higher(seqnum, channel->next_incoming_seqnum);
1445                 bool is_old_packet = seqnum_higher(channel->next_incoming_seqnum, seqnum);
1446                 
1447                 PrintInfo();
1448                 if(is_future_packet)
1449                         dout_con<<"BUFFERING";
1450                 else if(is_old_packet)
1451                         dout_con<<"OLD";
1452                 else
1453                         dout_con<<"RECUR";
1454                 dout_con<<" TYPE_RELIABLE seqnum="<<seqnum
1455                                 <<" next="<<channel->next_incoming_seqnum;
1456                 dout_con<<" [sending CONTROLTYPE_ACK"
1457                                 " to peer_id="<<peer_id<<"]";
1458                 dout_con<<std::endl;
1459                 
1460                 //DEBUG
1461                 //assert(channel->incoming_reliables.size() < 100);
1462
1463                 // Send a CONTROLTYPE_ACK
1464                 SharedBuffer<u8> reply(4);
1465                 writeU8(&reply[0], TYPE_CONTROL);
1466                 writeU8(&reply[1], CONTROLTYPE_ACK);
1467                 writeU16(&reply[2], seqnum);
1468                 rawSendAsPacket(peer_id, channelnum, reply, false);
1469
1470                 //if(seqnum_higher(seqnum, channel->next_incoming_seqnum))
1471                 if(is_future_packet)
1472                 {
1473                         /*PrintInfo();
1474                         dout_con<<"Buffering reliable packet (seqnum="
1475                                         <<seqnum<<")"<<std::endl;*/
1476                         
1477                         // This one comes later, buffer it.
1478                         // Actually we have to make a packet to buffer one.
1479                         // Well, we have all the ingredients, so just do it.
1480                         BufferedPacket packet = makePacket(
1481                                         getPeer(peer_id)->address,
1482                                         packetdata,
1483                                         GetProtocolID(),
1484                                         peer_id,
1485                                         channelnum);
1486                         try{
1487                                 channel->incoming_reliables.insert(packet);
1488                                 
1489                                 /*PrintInfo();
1490                                 dout_con<<"INCOMING: ";
1491                                 channel->incoming_reliables.print();
1492                                 dout_con<<std::endl;*/
1493                         }
1494                         catch(AlreadyExistsException &e)
1495                         {
1496                         }
1497
1498                         throw ProcessedSilentlyException("Buffered future reliable packet");
1499                 }
1500                 //else if(seqnum_higher(channel->next_incoming_seqnum, seqnum))
1501                 else if(is_old_packet)
1502                 {
1503                         // An old packet, dump it
1504                         throw InvalidIncomingDataException("Got an old reliable packet");
1505                 }
1506
1507                 channel->next_incoming_seqnum++;
1508
1509                 // Get out the inside packet and re-process it
1510                 SharedBuffer<u8> payload(packetdata.getSize() - RELIABLE_HEADER_SIZE);
1511                 memcpy(*payload, &packetdata[RELIABLE_HEADER_SIZE], payload.getSize());
1512
1513                 return processPacket(channel, payload, peer_id, channelnum, true);
1514         }
1515         else
1516         {
1517                 PrintInfo(derr_con);
1518                 derr_con<<"Got invalid type="<<((int)type&0xff)<<std::endl;
1519                 throw InvalidIncomingDataException("Invalid packet type");
1520         }
1521         
1522         // We should never get here.
1523         // If you get here, add an exception or a return to some of the
1524         // above conditionals.
1525         assert(0);
1526         throw BaseException("Error in Channel::ProcessPacket()");
1527 }
1528
1529 bool Connection::deletePeer(u16 peer_id, bool timeout)
1530 {
1531         if(m_peers.find(peer_id) == m_peers.end())
1532                 return false;
1533         
1534         Peer *peer = m_peers[peer_id];
1535
1536         // Create event
1537         ConnectionEvent e;
1538         e.peerRemoved(peer_id, timeout, peer->address);
1539         putEvent(e);
1540
1541         delete m_peers[peer_id];
1542         m_peers.erase(peer_id);
1543         return true;
1544 }
1545
1546 /* Interface */
1547
1548 ConnectionEvent Connection::getEvent()
1549 {
1550         if(m_event_queue.empty()){
1551                 ConnectionEvent e;
1552                 e.type = CONNEVENT_NONE;
1553                 return e;
1554         }
1555         return m_event_queue.pop_front();
1556 }
1557
1558 ConnectionEvent Connection::waitEvent(u32 timeout_ms)
1559 {
1560         try{
1561                 return m_event_queue.pop_front(timeout_ms);
1562         } catch(ItemNotFoundException &ex){
1563                 ConnectionEvent e;
1564                 e.type = CONNEVENT_NONE;
1565                 return e;
1566         }
1567 }
1568
1569 void Connection::putCommand(ConnectionCommand &c)
1570 {
1571         m_command_queue.push_back(c);
1572 }
1573
1574 void Connection::Serve(unsigned short port)
1575 {
1576         ConnectionCommand c;
1577         c.serve(port);
1578         putCommand(c);
1579 }
1580
1581 void Connection::Connect(Address address)
1582 {
1583         ConnectionCommand c;
1584         c.connect(address);
1585         putCommand(c);
1586 }
1587
1588 bool Connection::Connected()
1589 {
1590         JMutexAutoLock peerlock(m_peers_mutex);
1591
1592         if(m_peers.size() != 1)
1593                 return false;
1594                 
1595         std::map<u16, Peer*>::iterator node = m_peers.find(PEER_ID_SERVER);
1596         if(node == m_peers.end())
1597                 return false;
1598         
1599         if(m_peer_id == PEER_ID_INEXISTENT)
1600                 return false;
1601         
1602         return true;
1603 }
1604
1605 void Connection::Disconnect()
1606 {
1607         ConnectionCommand c;
1608         c.disconnect();
1609         putCommand(c);
1610 }
1611
1612 u32 Connection::Receive(u16 &peer_id, SharedBuffer<u8> &data)
1613 {
1614         for(;;){
1615                 ConnectionEvent e = waitEvent(m_bc_receive_timeout);
1616                 if(e.type != CONNEVENT_NONE)
1617                         dout_con<<getDesc()<<": Receive: got event: "
1618                                         <<e.describe()<<std::endl;
1619                 switch(e.type){
1620                 case CONNEVENT_NONE:
1621                         throw NoIncomingDataException("No incoming data");
1622                 case CONNEVENT_DATA_RECEIVED:
1623                         peer_id = e.peer_id;
1624                         data = SharedBuffer<u8>(e.data);
1625                         return e.data.getSize();
1626                 case CONNEVENT_PEER_ADDED: {
1627                         Peer tmp(e.peer_id, e.address);
1628                         if(m_bc_peerhandler)
1629                                 m_bc_peerhandler->peerAdded(&tmp);
1630                         continue; }
1631                 case CONNEVENT_PEER_REMOVED: {
1632                         Peer tmp(e.peer_id, e.address);
1633                         if(m_bc_peerhandler)
1634                                 m_bc_peerhandler->deletingPeer(&tmp, e.timeout);
1635                         continue; }
1636                 case CONNEVENT_BIND_FAILED:
1637                         throw ConnectionBindFailed("Failed to bind socket "
1638                                         "(port already in use?)");
1639                 }
1640         }
1641         throw NoIncomingDataException("No incoming data");
1642 }
1643
1644 void Connection::SendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable)
1645 {
1646         assert(channelnum < CHANNEL_COUNT);
1647
1648         ConnectionCommand c;
1649         c.sendToAll(channelnum, data, reliable);
1650         putCommand(c);
1651 }
1652
1653 void Connection::Send(u16 peer_id, u8 channelnum,
1654                 SharedBuffer<u8> data, bool reliable)
1655 {
1656         assert(channelnum < CHANNEL_COUNT);
1657
1658         ConnectionCommand c;
1659         c.send(peer_id, channelnum, data, reliable);
1660         putCommand(c);
1661 }
1662
1663 void Connection::RunTimeouts(float dtime)
1664 {
1665         // No-op
1666 }
1667
1668 Address Connection::GetPeerAddress(u16 peer_id)
1669 {
1670         JMutexAutoLock peerlock(m_peers_mutex);
1671         return getPeer(peer_id)->address;
1672 }
1673
1674 float Connection::GetPeerAvgRTT(u16 peer_id)
1675 {
1676         JMutexAutoLock peerlock(m_peers_mutex);
1677         return getPeer(peer_id)->avg_rtt;
1678 }
1679
1680 void Connection::DeletePeer(u16 peer_id)
1681 {
1682         ConnectionCommand c;
1683         c.deletePeer(peer_id);
1684         putCommand(c);
1685 }
1686
1687 void Connection::PrintInfo(std::ostream &out)
1688 {
1689         out<<getDesc()<<": ";
1690 }
1691
1692 void Connection::PrintInfo()
1693 {
1694         PrintInfo(dout_con);
1695 }
1696
1697 std::string Connection::getDesc()
1698 {
1699         return std::string("con(")+itos(m_socket.GetHandle())+"/"+itos(m_peer_id)+")";
1700 }
1701
1702 } // namespace
1703