removed duplicate "bmp"
[oweals/minetest.git] / src / connection.cpp
1 /*
2 Minetest-c55
3 Copyright (C) 2010 celeron55, Perttu Ahola <celeron55@gmail.com>
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License along
16 with this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20 #include "connection.h"
21 #include "main.h"
22 #include "serialization.h"
23
24 namespace con
25 {
26
27 BufferedPacket makePacket(Address &address, u8 *data, u32 datasize,
28                 u32 protocol_id, u16 sender_peer_id, u8 channel)
29 {
30         u32 packet_size = datasize + BASE_HEADER_SIZE;
31         BufferedPacket p(packet_size);
32         p.address = address;
33
34         writeU32(&p.data[0], protocol_id);
35         writeU16(&p.data[4], sender_peer_id);
36         writeU8(&p.data[6], channel);
37
38         memcpy(&p.data[BASE_HEADER_SIZE], data, datasize);
39
40         return p;
41 }
42
43 BufferedPacket makePacket(Address &address, SharedBuffer<u8> &data,
44                 u32 protocol_id, u16 sender_peer_id, u8 channel)
45 {
46         return makePacket(address, *data, data.getSize(),
47                         protocol_id, sender_peer_id, channel);
48 }
49
50 SharedBuffer<u8> makeOriginalPacket(
51                 SharedBuffer<u8> data)
52 {
53         u32 header_size = 1;
54         u32 packet_size = data.getSize() + header_size;
55         SharedBuffer<u8> b(packet_size);
56
57         writeU8(&b[0], TYPE_ORIGINAL);
58
59         memcpy(&b[header_size], *data, data.getSize());
60
61         return b;
62 }
63
64 core::list<SharedBuffer<u8> > makeSplitPacket(
65                 SharedBuffer<u8> data,
66                 u32 chunksize_max,
67                 u16 seqnum)
68 {
69         // Chunk packets, containing the TYPE_SPLIT header
70         core::list<SharedBuffer<u8> > chunks;
71         
72         u32 chunk_header_size = 7;
73         u32 maximum_data_size = chunksize_max - chunk_header_size;
74         u32 start = 0;
75         u32 end = 0;
76         u32 chunk_num = 0;
77         do{
78                 end = start + maximum_data_size - 1;
79                 if(end > data.getSize() - 1)
80                         end = data.getSize() - 1;
81                 
82                 u32 payload_size = end - start + 1;
83                 u32 packet_size = chunk_header_size + payload_size;
84
85                 SharedBuffer<u8> chunk(packet_size);
86                 
87                 writeU8(&chunk[0], TYPE_SPLIT);
88                 writeU16(&chunk[1], seqnum);
89                 // [3] u16 chunk_count is written at next stage
90                 writeU16(&chunk[5], chunk_num);
91                 memcpy(&chunk[chunk_header_size], &data[start], payload_size);
92
93                 chunks.push_back(chunk);
94                 
95                 start = end + 1;
96                 chunk_num++;
97         }
98         while(end != data.getSize() - 1);
99
100         u16 chunk_count = chunks.getSize();
101
102         core::list<SharedBuffer<u8> >::Iterator i = chunks.begin();
103         for(; i != chunks.end(); i++)
104         {
105                 // Write chunk_count
106                 writeU16(&((*i)[3]), chunk_count);
107         }
108
109         return chunks;
110 }
111
112 core::list<SharedBuffer<u8> > makeAutoSplitPacket(
113                 SharedBuffer<u8> data,
114                 u32 chunksize_max,
115                 u16 &split_seqnum)
116 {
117         u32 original_header_size = 1;
118         core::list<SharedBuffer<u8> > list;
119         if(data.getSize() + original_header_size > chunksize_max)
120         {
121                 list = makeSplitPacket(data, chunksize_max, split_seqnum);
122                 split_seqnum++;
123                 return list;
124         }
125         else
126         {
127                 list.push_back(makeOriginalPacket(data));
128         }
129         return list;
130 }
131
132 SharedBuffer<u8> makeReliablePacket(
133                 SharedBuffer<u8> data,
134                 u16 seqnum)
135 {
136         /*dstream<<"BEGIN SharedBuffer<u8> makeReliablePacket()"<<std::endl;
137         dstream<<"data.getSize()="<<data.getSize()<<", data[0]="
138                         <<((unsigned int)data[0]&0xff)<<std::endl;*/
139         u32 header_size = 3;
140         u32 packet_size = data.getSize() + header_size;
141         SharedBuffer<u8> b(packet_size);
142
143         writeU8(&b[0], TYPE_RELIABLE);
144         writeU16(&b[1], seqnum);
145
146         memcpy(&b[header_size], *data, data.getSize());
147
148         /*dstream<<"data.getSize()="<<data.getSize()<<", data[0]="
149                         <<((unsigned int)data[0]&0xff)<<std::endl;*/
150         //dstream<<"END SharedBuffer<u8> makeReliablePacket()"<<std::endl;
151         return b;
152 }
153
154 /*
155         ReliablePacketBuffer
156 */
157
158 void ReliablePacketBuffer::print()
159 {
160         core::list<BufferedPacket>::Iterator i;
161         i = m_list.begin();
162         for(; i != m_list.end(); i++)
163         {
164                 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
165                 dout_con<<s<<" ";
166         }
167 }
168 bool ReliablePacketBuffer::empty()
169 {
170         return m_list.empty();
171 }
172 u32 ReliablePacketBuffer::size()
173 {
174         return m_list.getSize();
175 }
176 RPBSearchResult ReliablePacketBuffer::findPacket(u16 seqnum)
177 {
178         core::list<BufferedPacket>::Iterator i;
179         i = m_list.begin();
180         for(; i != m_list.end(); i++)
181         {
182                 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
183                 /*dout_con<<"findPacket(): finding seqnum="<<seqnum
184                                 <<", comparing to s="<<s<<std::endl;*/
185                 if(s == seqnum)
186                         break;
187         }
188         return i;
189 }
190 RPBSearchResult ReliablePacketBuffer::notFound()
191 {
192         return m_list.end();
193 }
194 u16 ReliablePacketBuffer::getFirstSeqnum()
195 {
196         if(empty())
197                 throw NotFoundException("Buffer is empty");
198         BufferedPacket p = *m_list.begin();
199         return readU16(&p.data[BASE_HEADER_SIZE+1]);
200 }
201 BufferedPacket ReliablePacketBuffer::popFirst()
202 {
203         if(empty())
204                 throw NotFoundException("Buffer is empty");
205         BufferedPacket p = *m_list.begin();
206         core::list<BufferedPacket>::Iterator i = m_list.begin();
207         m_list.erase(i);
208         return p;
209 }
210 BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum)
211 {
212         RPBSearchResult r = findPacket(seqnum);
213         if(r == notFound()){
214                 dout_con<<"Not found"<<std::endl;
215                 throw NotFoundException("seqnum not found in buffer");
216         }
217         BufferedPacket p = *r;
218         m_list.erase(r);
219         return p;
220 }
221 void ReliablePacketBuffer::insert(BufferedPacket &p)
222 {
223         assert(p.data.getSize() >= BASE_HEADER_SIZE+3);
224         u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
225         assert(type == TYPE_RELIABLE);
226         u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
227
228         // Find the right place for the packet and insert it there
229
230         // If list is empty, just add it
231         if(m_list.empty())
232         {
233                 m_list.push_back(p);
234                 // Done.
235                 return;
236         }
237         // Otherwise find the right place
238         core::list<BufferedPacket>::Iterator i;
239         i = m_list.begin();
240         // Find the first packet in the list which has a higher seqnum
241         for(; i != m_list.end(); i++){
242                 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
243                 if(s == seqnum){
244                         throw AlreadyExistsException("Same seqnum in list");
245                 }
246                 if(seqnum_higher(s, seqnum)){
247                         break;
248                 }
249         }
250         // If we're at the end of the list, add the packet to the
251         // end of the list
252         if(i == m_list.end())
253         {
254                 m_list.push_back(p);
255                 // Done.
256                 return;
257         }
258         // Insert before i
259         m_list.insert_before(i, p);
260 }
261
262 void ReliablePacketBuffer::incrementTimeouts(float dtime)
263 {
264         core::list<BufferedPacket>::Iterator i;
265         i = m_list.begin();
266         for(; i != m_list.end(); i++){
267                 i->time += dtime;
268                 i->totaltime += dtime;
269         }
270 }
271
272 void ReliablePacketBuffer::resetTimedOuts(float timeout)
273 {
274         core::list<BufferedPacket>::Iterator i;
275         i = m_list.begin();
276         for(; i != m_list.end(); i++){
277                 if(i->time >= timeout)
278                         i->time = 0.0;
279         }
280 }
281
282 bool ReliablePacketBuffer::anyTotaltimeReached(float timeout)
283 {
284         core::list<BufferedPacket>::Iterator i;
285         i = m_list.begin();
286         for(; i != m_list.end(); i++){
287                 if(i->totaltime >= timeout)
288                         return true;
289         }
290         return false;
291 }
292
293 core::list<BufferedPacket> ReliablePacketBuffer::getTimedOuts(float timeout)
294 {
295         core::list<BufferedPacket> timed_outs;
296         core::list<BufferedPacket>::Iterator i;
297         i = m_list.begin();
298         for(; i != m_list.end(); i++)
299         {
300                 if(i->time >= timeout)
301                         timed_outs.push_back(*i);
302         }
303         return timed_outs;
304 }
305
306 /*
307         IncomingSplitBuffer
308 */
309
310 IncomingSplitBuffer::~IncomingSplitBuffer()
311 {
312         core::map<u16, IncomingSplitPacket*>::Iterator i;
313         i = m_buf.getIterator();
314         for(; i.atEnd() == false; i++)
315         {
316                 delete i.getNode()->getValue();
317         }
318 }
319 /*
320         This will throw a GotSplitPacketException when a full
321         split packet is constructed.
322 */
323 void IncomingSplitBuffer::insert(BufferedPacket &p, bool reliable)
324 {
325         u32 headersize = BASE_HEADER_SIZE + 7;
326         assert(p.data.getSize() >= headersize);
327         u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
328         assert(type == TYPE_SPLIT);
329         u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
330         u16 chunk_count = readU16(&p.data[BASE_HEADER_SIZE+3]);
331         u16 chunk_num = readU16(&p.data[BASE_HEADER_SIZE+5]);
332
333         // Add if doesn't exist
334         if(m_buf.find(seqnum) == NULL)
335         {
336                 IncomingSplitPacket *sp = new IncomingSplitPacket();
337                 sp->chunk_count = chunk_count;
338                 sp->reliable = reliable;
339                 m_buf[seqnum] = sp;
340         }
341         
342         IncomingSplitPacket *sp = m_buf[seqnum];
343         
344         // TODO: These errors should be thrown or something? Dunno.
345         if(chunk_count != sp->chunk_count)
346                 derr_con<<"Connection: WARNING: chunk_count="<<chunk_count
347                                 <<" != sp->chunk_count="<<sp->chunk_count
348                                 <<std::endl;
349         if(reliable != sp->reliable)
350                 derr_con<<"Connection: WARNING: reliable="<<reliable
351                                 <<" != sp->reliable="<<sp->reliable
352                                 <<std::endl;
353
354         // If chunk already exists, cancel
355         if(sp->chunks.find(chunk_num) != NULL)
356                 throw AlreadyExistsException("Chunk already in buffer");
357         
358         // Cut chunk data out of packet
359         u32 chunkdatasize = p.data.getSize() - headersize;
360         SharedBuffer<u8> chunkdata(chunkdatasize);
361         memcpy(*chunkdata, &(p.data[headersize]), chunkdatasize);
362         
363         // Set chunk data in buffer
364         sp->chunks[chunk_num] = chunkdata;
365         
366         // If not all chunks are received, return
367         if(sp->allReceived() == false)
368                 return;
369
370         // Calculate total size
371         u32 totalsize = 0;
372         core::map<u16, SharedBuffer<u8> >::Iterator i;
373         i = sp->chunks.getIterator();
374         for(; i.atEnd() == false; i++)
375         {
376                 totalsize += i.getNode()->getValue().getSize();
377         }
378         
379         SharedBuffer<u8> fulldata(totalsize);
380
381         // Copy chunks to data buffer
382         u32 start = 0;
383         for(u32 chunk_i=0; chunk_i<sp->chunk_count;
384                         chunk_i++)
385         {
386                 SharedBuffer<u8> buf = sp->chunks[chunk_i];
387                 u16 chunkdatasize = buf.getSize();
388                 memcpy(&fulldata[start], *buf, chunkdatasize);
389                 start += chunkdatasize;;
390         }
391
392         // Remove sp from buffer
393         m_buf.remove(seqnum);
394         delete sp;
395         
396         throw GotSplitPacketException(fulldata);
397 }
398 void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout)
399 {
400         core::list<u16> remove_queue;
401         core::map<u16, IncomingSplitPacket*>::Iterator i;
402         i = m_buf.getIterator();
403         for(; i.atEnd() == false; i++)
404         {
405                 IncomingSplitPacket *p = i.getNode()->getValue();
406                 // Reliable ones are not removed by timeout
407                 if(p->reliable == true)
408                         continue;
409                 p->time += dtime;
410                 if(p->time >= timeout)
411                         remove_queue.push_back(i.getNode()->getKey());
412         }
413         core::list<u16>::Iterator j;
414         j = remove_queue.begin();
415         for(; j != remove_queue.end(); j++)
416         {
417                 dout_con<<"NOTE: Removing timed out unreliable split packet"
418                                 <<std::endl;
419                 delete m_buf[*j];
420                 m_buf.remove(*j);
421         }
422 }
423
424 /*
425         Channel
426 */
427
428 Channel::Channel()
429 {
430         next_outgoing_seqnum = SEQNUM_INITIAL;
431         next_incoming_seqnum = SEQNUM_INITIAL;
432         next_outgoing_split_seqnum = SEQNUM_INITIAL;
433 }
434 Channel::~Channel()
435 {
436 }
437
438 /*
439         Peer
440 */
441
442 Peer::Peer(u16 a_id, Address a_address)
443 {
444         id = a_id;
445         address = a_address;
446         timeout_counter = 0.0;
447         //resend_timeout = RESEND_TIMEOUT_MINIMUM;
448         ping_timer = 0.0;
449         resend_timeout = 0.5;
450         avg_rtt = -1.0;
451         has_sent_with_id = false;
452 }
453 Peer::~Peer()
454 {
455 }
456
457 void Peer::reportRTT(float rtt)
458 {
459         if(rtt < -0.999)
460         {}
461         else if(avg_rtt < 0.0)
462                 avg_rtt = rtt;
463         else
464                 avg_rtt = rtt * 0.1 + avg_rtt * 0.9;
465         
466         // Calculate resend_timeout
467
468         /*int reliable_count = 0;
469         for(int i=0; i<CHANNEL_COUNT; i++)
470         {
471                 reliable_count += channels[i].outgoing_reliables.size();
472         }
473         float timeout = avg_rtt * RESEND_TIMEOUT_FACTOR
474                         * ((float)reliable_count * 1);*/
475         
476         float timeout = avg_rtt * RESEND_TIMEOUT_FACTOR;
477         if(timeout < RESEND_TIMEOUT_MIN)
478                 timeout = RESEND_TIMEOUT_MIN;
479         if(timeout > RESEND_TIMEOUT_MAX)
480                 timeout = RESEND_TIMEOUT_MAX;
481         resend_timeout = timeout;
482 }
483                                 
484 /*
485         Connection
486 */
487
488 Connection::Connection(
489         u32 protocol_id,
490         u32 max_packet_size,
491         float timeout,
492         PeerHandler *peerhandler
493 )
494 {
495         assert(peerhandler != NULL);
496
497         m_protocol_id = protocol_id;
498         m_max_packet_size = max_packet_size;
499         m_timeout = timeout;
500         m_peer_id = PEER_ID_INEXISTENT;
501         //m_waiting_new_peer_id = false;
502         m_indentation = 0;
503         m_peerhandler = peerhandler;
504 }
505
506 Connection::~Connection()
507 {
508         // Clear peers
509         core::map<u16, Peer*>::Iterator j;
510         j = m_peers.getIterator();
511         for(; j.atEnd() == false; j++)
512         {
513                 Peer *peer = j.getNode()->getValue();
514                 delete peer;
515         }
516 }
517
518 void Connection::Serve(unsigned short port)
519 {
520         m_socket.Bind(port);
521         m_peer_id = PEER_ID_SERVER;
522 }
523
524 void Connection::Connect(Address address)
525 {
526         core::map<u16, Peer*>::Node *node = m_peers.find(PEER_ID_SERVER);
527         if(node != NULL){
528                 throw ConnectionException("Already connected to a server");
529         }
530
531         Peer *peer = new Peer(PEER_ID_SERVER, address);
532         m_peers.insert(peer->id, peer);
533         m_peerhandler->peerAdded(peer);
534         
535         m_socket.Bind(0);
536         
537         // Send a dummy packet to server with peer_id = PEER_ID_INEXISTENT
538         m_peer_id = PEER_ID_INEXISTENT;
539         SharedBuffer<u8> data(0);
540         Send(PEER_ID_SERVER, 0, data, true);
541
542         //m_waiting_new_peer_id = true;
543 }
544
545 void Connection::Disconnect()
546 {
547         // Create and send DISCO packet
548         SharedBuffer<u8> data(2);
549         writeU8(&data[0], TYPE_CONTROL);
550         writeU8(&data[1], CONTROLTYPE_DISCO);
551         
552         // Send to all
553         core::map<u16, Peer*>::Iterator j;
554         j = m_peers.getIterator();
555         for(; j.atEnd() == false; j++)
556         {
557                 Peer *peer = j.getNode()->getValue();
558                 SendAsPacket(peer->id, 0, data, false);
559         }
560 }
561
562 bool Connection::Connected()
563 {
564         if(m_peers.size() != 1)
565                 return false;
566                 
567         core::map<u16, Peer*>::Node *node = m_peers.find(PEER_ID_SERVER);
568         if(node == NULL)
569                 return false;
570         
571         if(m_peer_id == PEER_ID_INEXISTENT)
572                 return false;
573         
574         return true;
575 }
576
577 SharedBuffer<u8> Channel::ProcessPacket(
578                 SharedBuffer<u8> packetdata,
579                 Connection *con,
580                 u16 peer_id,
581                 u8 channelnum,
582                 bool reliable)
583 {
584         IndentationRaiser iraiser(&(con->m_indentation));
585
586         if(packetdata.getSize() < 1)
587                 throw InvalidIncomingDataException("packetdata.getSize() < 1");
588
589         u8 type = readU8(&packetdata[0]);
590         
591         if(type == TYPE_CONTROL)
592         {
593                 if(packetdata.getSize() < 2)
594                         throw InvalidIncomingDataException("packetdata.getSize() < 2");
595
596                 u8 controltype = readU8(&packetdata[1]);
597
598                 if(controltype == CONTROLTYPE_ACK)
599                 {
600                         if(packetdata.getSize() < 4)
601                                 throw InvalidIncomingDataException
602                                                 ("packetdata.getSize() < 4 (ACK header size)");
603
604                         u16 seqnum = readU16(&packetdata[2]);
605                         con->PrintInfo();
606                         dout_con<<"Got CONTROLTYPE_ACK: channelnum="
607                                         <<((int)channelnum&0xff)<<", peer_id="<<peer_id
608                                         <<", seqnum="<<seqnum<<std::endl;
609
610                         try{
611                                 BufferedPacket p = outgoing_reliables.popSeqnum(seqnum);
612                                 // Get round trip time
613                                 float rtt = p.totaltime;
614
615                                 // Let peer calculate stuff according to it
616                                 // (avg_rtt and resend_timeout)
617                                 Peer *peer = con->GetPeer(peer_id);
618                                 peer->reportRTT(rtt);
619
620                                 //con->PrintInfo(dout_con);
621                                 //dout_con<<"RTT = "<<rtt<<std::endl;
622
623                                 /*dout_con<<"OUTGOING: ";
624                                 con->PrintInfo();
625                                 outgoing_reliables.print();
626                                 dout_con<<std::endl;*/
627                         }
628                         catch(NotFoundException &e){
629                                 con->PrintInfo(derr_con);
630                                 derr_con<<"WARNING: ACKed packet not "
631                                                 "in outgoing queue"
632                                                 <<std::endl;
633                         }
634
635                         throw ProcessedSilentlyException("Got an ACK");
636                 }
637                 else if(controltype == CONTROLTYPE_SET_PEER_ID)
638                 {
639                         if(packetdata.getSize() < 4)
640                                 throw InvalidIncomingDataException
641                                                 ("packetdata.getSize() < 4 (SET_PEER_ID header size)");
642                         u16 peer_id_new = readU16(&packetdata[2]);
643                         con->PrintInfo();
644                         dout_con<<"Got new peer id: "<<peer_id_new<<"... "<<std::endl;
645
646                         if(con->GetPeerID() != PEER_ID_INEXISTENT)
647                         {
648                                 con->PrintInfo(derr_con);
649                                 derr_con<<"WARNING: Not changing"
650                                                 " existing peer id."<<std::endl;
651                         }
652                         else
653                         {
654                                 dout_con<<"changing."<<std::endl;
655                                 con->SetPeerID(peer_id_new);
656                         }
657                         throw ProcessedSilentlyException("Got a SET_PEER_ID");
658                 }
659                 else if(controltype == CONTROLTYPE_PING)
660                 {
661                         // Just ignore it, the incoming data already reset
662                         // the timeout counter
663                         con->PrintInfo();
664                         dout_con<<"PING"<<std::endl;
665                         throw ProcessedSilentlyException("Got a PING");
666                 }
667                 else if(controltype == CONTROLTYPE_DISCO)
668                 {
669                         // Just ignore it, the incoming data already reset
670                         // the timeout counter
671                         con->PrintInfo();
672                         dout_con<<"DISCO: Removing peer "<<(peer_id)<<std::endl;
673                         
674                         if(con->deletePeer(peer_id, false) == false)
675                         {
676                                 con->PrintInfo(derr_con);
677                                 derr_con<<"DISCO: Peer not found"<<std::endl;
678                         }
679
680                         throw ProcessedSilentlyException("Got a DISCO");
681                 }
682                 else{
683                         con->PrintInfo(derr_con);
684                         derr_con<<"INVALID TYPE_CONTROL: invalid controltype="
685                                         <<((int)controltype&0xff)<<std::endl;
686                         throw InvalidIncomingDataException("Invalid control type");
687                 }
688         }
689         else if(type == TYPE_ORIGINAL)
690         {
691                 if(packetdata.getSize() < ORIGINAL_HEADER_SIZE)
692                         throw InvalidIncomingDataException
693                                         ("packetdata.getSize() < ORIGINAL_HEADER_SIZE");
694                 con->PrintInfo();
695                 dout_con<<"RETURNING TYPE_ORIGINAL to user"
696                                 <<std::endl;
697                 // Get the inside packet out and return it
698                 SharedBuffer<u8> payload(packetdata.getSize() - ORIGINAL_HEADER_SIZE);
699                 memcpy(*payload, &packetdata[ORIGINAL_HEADER_SIZE], payload.getSize());
700                 return payload;
701         }
702         else if(type == TYPE_SPLIT)
703         {
704                 // We have to create a packet again for buffering
705                 // This isn't actually too bad an idea.
706                 BufferedPacket packet = makePacket(
707                                 con->GetPeer(peer_id)->address,
708                                 packetdata,
709                                 con->GetProtocolID(),
710                                 peer_id,
711                                 channelnum);
712                 try{
713                         // Buffer the packet
714                         incoming_splits.insert(packet, reliable);
715                 }
716                 // This exception happens when all the pieces of a packet
717                 // are collected.
718                 catch(GotSplitPacketException &e)
719                 {
720                         con->PrintInfo();
721                         dout_con<<"RETURNING TYPE_SPLIT: Constructed full data, "
722                                         <<"size="<<e.getData().getSize()<<std::endl;
723                         return e.getData();
724                 }
725                 con->PrintInfo();
726                 dout_con<<"BUFFERING TYPE_SPLIT"<<std::endl;
727                 throw ProcessedSilentlyException("Buffered a split packet chunk");
728         }
729         else if(type == TYPE_RELIABLE)
730         {
731                 // Recursive reliable packets not allowed
732                 assert(reliable == false);
733
734                 if(packetdata.getSize() < RELIABLE_HEADER_SIZE)
735                         throw InvalidIncomingDataException
736                                         ("packetdata.getSize() < RELIABLE_HEADER_SIZE");
737
738                 u16 seqnum = readU16(&packetdata[1]);
739
740                 bool is_future_packet = seqnum_higher(seqnum, next_incoming_seqnum);
741                 bool is_old_packet = seqnum_higher(next_incoming_seqnum, seqnum);
742                 
743                 con->PrintInfo();
744                 if(is_future_packet)
745                         dout_con<<"BUFFERING";
746                 else if(is_old_packet)
747                         dout_con<<"OLD";
748                 else
749                         dout_con<<"RECUR";
750                 dout_con<<" TYPE_RELIABLE seqnum="<<seqnum
751                                 <<" next="<<next_incoming_seqnum;
752                 dout_con<<" [sending CONTROLTYPE_ACK"
753                                 " to peer_id="<<peer_id<<"]";
754                 dout_con<<std::endl;
755                 
756                 //DEBUG
757                 //assert(incoming_reliables.size() < 100);
758
759                 // Send a CONTROLTYPE_ACK
760                 SharedBuffer<u8> reply(4);
761                 writeU8(&reply[0], TYPE_CONTROL);
762                 writeU8(&reply[1], CONTROLTYPE_ACK);
763                 writeU16(&reply[2], seqnum);
764                 con->SendAsPacket(peer_id, channelnum, reply, false);
765
766                 //if(seqnum_higher(seqnum, next_incoming_seqnum))
767                 if(is_future_packet)
768                 {
769                         /*con->PrintInfo();
770                         dout_con<<"Buffering reliable packet (seqnum="
771                                         <<seqnum<<")"<<std::endl;*/
772                         
773                         // This one comes later, buffer it.
774                         // Actually we have to make a packet to buffer one.
775                         // Well, we have all the ingredients, so just do it.
776                         BufferedPacket packet = makePacket(
777                                         con->GetPeer(peer_id)->address,
778                                         packetdata,
779                                         con->GetProtocolID(),
780                                         peer_id,
781                                         channelnum);
782                         try{
783                                 incoming_reliables.insert(packet);
784                                 
785                                 /*con->PrintInfo();
786                                 dout_con<<"INCOMING: ";
787                                 incoming_reliables.print();
788                                 dout_con<<std::endl;*/
789                         }
790                         catch(AlreadyExistsException &e)
791                         {
792                         }
793
794                         throw ProcessedSilentlyException("Buffered future reliable packet");
795                 }
796                 //else if(seqnum_higher(next_incoming_seqnum, seqnum))
797                 else if(is_old_packet)
798                 {
799                         // An old packet, dump it
800                         throw InvalidIncomingDataException("Got an old reliable packet");
801                 }
802
803                 next_incoming_seqnum++;
804
805                 // Get out the inside packet and re-process it
806                 SharedBuffer<u8> payload(packetdata.getSize() - RELIABLE_HEADER_SIZE);
807                 memcpy(*payload, &packetdata[RELIABLE_HEADER_SIZE], payload.getSize());
808
809                 return ProcessPacket(payload, con, peer_id, channelnum, true);
810         }
811         else
812         {
813                 con->PrintInfo(derr_con);
814                 derr_con<<"Got invalid type="<<((int)type&0xff)<<std::endl;
815                 throw InvalidIncomingDataException("Invalid packet type");
816         }
817         
818         // We should never get here.
819         // If you get here, add an exception or a return to some of the
820         // above conditionals.
821         assert(0);
822         throw BaseException("Error in Channel::ProcessPacket()");
823 }
824
825 SharedBuffer<u8> Channel::CheckIncomingBuffers(Connection *con,
826                 u16 &peer_id)
827 {
828         u16 firstseqnum = 0;
829         // Clear old packets from start of buffer
830         try{
831         for(;;){
832                 firstseqnum = incoming_reliables.getFirstSeqnum();
833                 if(seqnum_higher(next_incoming_seqnum, firstseqnum))
834                         incoming_reliables.popFirst();
835                 else
836                         break;
837         }
838         // This happens if all packets are old
839         }catch(con::NotFoundException)
840         {}
841         
842         if(incoming_reliables.empty() == false)
843         {
844                 if(firstseqnum == next_incoming_seqnum)
845                 {
846                         BufferedPacket p = incoming_reliables.popFirst();
847                         
848                         peer_id = readPeerId(*p.data);
849                         u8 channelnum = readChannel(*p.data);
850                         u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
851
852                         con->PrintInfo();
853                         dout_con<<"UNBUFFERING TYPE_RELIABLE"
854                                         <<" seqnum="<<seqnum
855                                         <<" peer_id="<<peer_id
856                                         <<" channel="<<((int)channelnum&0xff)
857                                         <<std::endl;
858
859                         next_incoming_seqnum++;
860                         
861                         u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
862                         // Get out the inside packet and re-process it
863                         SharedBuffer<u8> payload(p.data.getSize() - headers_size);
864                         memcpy(*payload, &p.data[headers_size], payload.getSize());
865
866                         return ProcessPacket(payload, con, peer_id, channelnum, true);
867                 }
868         }
869                 
870         throw NoIncomingDataException("No relevant data in buffers");
871 }
872
873 SharedBuffer<u8> Connection::GetFromBuffers(u16 &peer_id)
874 {
875         core::map<u16, Peer*>::Iterator j;
876         j = m_peers.getIterator();
877         for(; j.atEnd() == false; j++)
878         {
879                 Peer *peer = j.getNode()->getValue();
880                 for(u16 i=0; i<CHANNEL_COUNT; i++)
881                 {
882                         Channel *channel = &peer->channels[i];
883                         try{
884                                 SharedBuffer<u8> resultdata = channel->CheckIncomingBuffers
885                                                 (this, peer_id);
886
887                                 return resultdata;
888                         }
889                         catch(NoIncomingDataException &e)
890                         {
891                         }
892                         catch(InvalidIncomingDataException &e)
893                         {
894                         }
895                         catch(ProcessedSilentlyException &e)
896                         {
897                         }
898                 }
899         }
900         throw NoIncomingDataException("No relevant data in buffers");
901 }
902
903 u32 Connection::Receive(u16 &peer_id, u8 *data, u32 datasize)
904 {
905         /*
906                 Receive a packet from the network
907         */
908         
909         // TODO: We can not know how many layers of header there are.
910         // For now, just assume there are no other than the base headers.
911         u32 packet_maxsize = datasize + BASE_HEADER_SIZE;
912         Buffer<u8> packetdata(packet_maxsize);
913         
914         for(;;)
915         {
916         try
917         {
918                 /*
919                         Check if some buffer has relevant data
920                 */
921                 try{
922                         SharedBuffer<u8> resultdata = GetFromBuffers(peer_id);
923
924                         if(datasize < resultdata.getSize())
925                                 throw InvalidIncomingDataException
926                                                 ("Buffer too small for received data");
927                                 
928                         memcpy(data, *resultdata, resultdata.getSize());
929                         return resultdata.getSize();
930                 }
931                 catch(NoIncomingDataException &e)
932                 {
933                 }
934         
935                 Address sender;
936
937                 s32 received_size = m_socket.Receive(sender, *packetdata, packet_maxsize);
938
939                 if(received_size < 0)
940                         throw NoIncomingDataException("No incoming data");
941                 if(received_size < BASE_HEADER_SIZE)
942                         throw InvalidIncomingDataException("No full header received");
943                 if(readU32(&packetdata[0]) != m_protocol_id)
944                         throw InvalidIncomingDataException("Invalid protocol id");
945                 
946                 peer_id = readPeerId(*packetdata);
947                 u8 channelnum = readChannel(*packetdata);
948                 if(channelnum > CHANNEL_COUNT-1){
949                         PrintInfo(derr_con);
950                         derr_con<<"Receive(): Invalid channel "<<channelnum<<std::endl;
951                         throw InvalidIncomingDataException("Channel doesn't exist");
952                 }
953
954                 if(peer_id == PEER_ID_INEXISTENT)
955                 {
956                         /*
957                                 Somebody is trying to send stuff to us with no peer id.
958                                 
959                                 Check if the same address and port was added to our peer
960                                 list before.
961                                 Allow only entries that have has_sent_with_id==false.
962                         */
963
964                         core::map<u16, Peer*>::Iterator j;
965                         j = m_peers.getIterator();
966                         for(; j.atEnd() == false; j++)
967                         {
968                                 Peer *peer = j.getNode()->getValue();
969                                 if(peer->has_sent_with_id)
970                                         continue;
971                                 if(peer->address == sender)
972                                         break;
973                         }
974                         
975                         /*
976                                 If no peer was found with the same address and port,
977                                 we shall assume it is a new peer and create an entry.
978                         */
979                         if(j.atEnd())
980                         {
981                                 // Pass on to adding the peer
982                         }
983                         // Else: A peer was found.
984                         else
985                         {
986                                 Peer *peer = j.getNode()->getValue();
987                                 peer_id = peer->id;
988                                 PrintInfo(derr_con);
989                                 derr_con<<"WARNING: Assuming unknown peer to be "
990                                                 <<"peer_id="<<peer_id<<std::endl;
991                         }
992                 }
993                 
994                 /*
995                         The peer was not found in our lists. Add it.
996                 */
997                 if(peer_id == PEER_ID_INEXISTENT)
998                 {
999                         // Somebody wants to make a new connection
1000
1001                         // Get a unique peer id (2 or higher)
1002                         u16 peer_id_new = 2;
1003                         /*
1004                                 Find an unused peer id
1005                         */
1006                         for(;;)
1007                         {
1008                                 // Check if exists
1009                                 if(m_peers.find(peer_id_new) == NULL)
1010                                         break;
1011                                 // Check for overflow
1012                                 if(peer_id_new == 65535)
1013                                         throw ConnectionException
1014                                                 ("Connection ran out of peer ids");
1015                                 peer_id_new++;
1016                         }
1017
1018                         PrintInfo();
1019                         dout_con<<"Receive(): Got a packet with peer_id=PEER_ID_INEXISTENT,"
1020                                         " giving peer_id="<<peer_id_new<<std::endl;
1021
1022                         // Create a peer
1023                         Peer *peer = new Peer(peer_id_new, sender);
1024                         m_peers.insert(peer->id, peer);
1025                         m_peerhandler->peerAdded(peer);
1026                         
1027                         // Create CONTROL packet to tell the peer id to the new peer.
1028                         SharedBuffer<u8> reply(4);
1029                         writeU8(&reply[0], TYPE_CONTROL);
1030                         writeU8(&reply[1], CONTROLTYPE_SET_PEER_ID);
1031                         writeU16(&reply[2], peer_id_new);
1032                         SendAsPacket(peer_id_new, 0, reply, true);
1033                         
1034                         // We're now talking to a valid peer_id
1035                         peer_id = peer_id_new;
1036
1037                         // Go on and process whatever it sent
1038                 }
1039
1040                 core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
1041
1042                 if(node == NULL)
1043                 {
1044                         // Peer not found
1045                         // This means that the peer id of the sender is not PEER_ID_INEXISTENT
1046                         // and it is invalid.
1047                         PrintInfo(derr_con);
1048                         derr_con<<"Receive(): Peer not found"<<std::endl;
1049                         throw InvalidIncomingDataException("Peer not found (possible timeout)");
1050                 }
1051
1052                 Peer *peer = node->getValue();
1053
1054                 // Validate peer address
1055                 if(peer->address != sender)
1056                 {
1057                         PrintInfo(derr_con);
1058                         derr_con<<"Peer "<<peer_id<<" sending from different address."
1059                                         " Ignoring."<<std::endl;
1060                         throw InvalidIncomingDataException
1061                                         ("Peer sending from different address");
1062                         /*// If there is more data, receive again
1063                         if(m_socket.WaitData(0) == true)
1064                                 continue;
1065                         throw NoIncomingDataException("No incoming data (2)");*/
1066                 }
1067                 
1068                 peer->timeout_counter = 0.0;
1069
1070                 Channel *channel = &(peer->channels[channelnum]);
1071                 
1072                 // Throw the received packet to channel->processPacket()
1073
1074                 // Make a new SharedBuffer from the data without the base headers
1075                 SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE);
1076                 memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE],
1077                                 strippeddata.getSize());
1078                 
1079                 try{
1080                         // Process it (the result is some data with no headers made by us)
1081                         SharedBuffer<u8> resultdata = channel->ProcessPacket
1082                                         (strippeddata, this, peer_id, channelnum);
1083                         
1084                         PrintInfo();
1085                         dout_con<<"ProcessPacket returned data of size "
1086                                         <<resultdata.getSize()<<std::endl;
1087                         
1088                         if(datasize < resultdata.getSize())
1089                                 throw InvalidIncomingDataException
1090                                                 ("Buffer too small for received data");
1091                         
1092                         memcpy(data, *resultdata, resultdata.getSize());
1093                         return resultdata.getSize();
1094                 }
1095                 catch(ProcessedSilentlyException &e)
1096                 {
1097                         // If there is more data, receive again
1098                         if(m_socket.WaitData(0) == true)
1099                                 continue;
1100                 }
1101                 throw NoIncomingDataException("No incoming data (2)");
1102         } // try
1103         catch(InvalidIncomingDataException &e)
1104         {
1105                 // If there is more data, receive again
1106                 if(m_socket.WaitData(0) == true)
1107                         continue;
1108         }
1109         } // for
1110 }
1111
1112 void Connection::SendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable)
1113 {
1114         core::map<u16, Peer*>::Iterator j;
1115         j = m_peers.getIterator();
1116         for(; j.atEnd() == false; j++)
1117         {
1118                 Peer *peer = j.getNode()->getValue();
1119                 Send(peer->id, channelnum, data, reliable);
1120         }
1121 }
1122
1123 void Connection::Send(u16 peer_id, u8 channelnum,
1124                 SharedBuffer<u8> data, bool reliable)
1125 {
1126         assert(channelnum < CHANNEL_COUNT);
1127         
1128         Peer *peer = GetPeer(peer_id);
1129         Channel *channel = &(peer->channels[channelnum]);
1130
1131         u32 chunksize_max = m_max_packet_size - BASE_HEADER_SIZE;
1132         if(reliable)
1133                 chunksize_max -= RELIABLE_HEADER_SIZE;
1134
1135         core::list<SharedBuffer<u8> > originals;
1136         originals = makeAutoSplitPacket(data, chunksize_max,
1137                         channel->next_outgoing_split_seqnum);
1138         
1139         core::list<SharedBuffer<u8> >::Iterator i;
1140         i = originals.begin();
1141         for(; i != originals.end(); i++)
1142         {
1143                 SharedBuffer<u8> original = *i;
1144                 
1145                 SendAsPacket(peer_id, channelnum, original, reliable);
1146         }
1147 }
1148
1149 void Connection::SendAsPacket(u16 peer_id, u8 channelnum,
1150                 SharedBuffer<u8> data, bool reliable)
1151 {
1152         Peer *peer = GetPeer(peer_id);
1153         Channel *channel = &(peer->channels[channelnum]);
1154
1155         if(reliable)
1156         {
1157                 u16 seqnum = channel->next_outgoing_seqnum;
1158                 channel->next_outgoing_seqnum++;
1159
1160                 SharedBuffer<u8> reliable = makeReliablePacket(data, seqnum);
1161
1162                 // Add base headers and make a packet
1163                 BufferedPacket p = makePacket(peer->address, reliable,
1164                                 m_protocol_id, m_peer_id, channelnum);
1165                 
1166                 try{
1167                         // Buffer the packet
1168                         channel->outgoing_reliables.insert(p);
1169                 }
1170                 catch(AlreadyExistsException &e)
1171                 {
1172                         PrintInfo(derr_con);
1173                         derr_con<<"WARNING: Going to send a reliable packet "
1174                                         "seqnum="<<seqnum<<" that is already "
1175                                         "in outgoing buffer"<<std::endl;
1176                         //assert(0);
1177                 }
1178                 
1179                 // Send the packet
1180                 RawSend(p);
1181         }
1182         else
1183         {
1184                 // Add base headers and make a packet
1185                 BufferedPacket p = makePacket(peer->address, data,
1186                                 m_protocol_id, m_peer_id, channelnum);
1187
1188                 // Send the packet
1189                 RawSend(p);
1190         }
1191 }
1192
1193 void Connection::RawSend(const BufferedPacket &packet)
1194 {
1195         m_socket.Send(packet.address, *packet.data, packet.data.getSize());
1196 }
1197
1198 void Connection::RunTimeouts(float dtime)
1199 {
1200         core::list<u16> timeouted_peers;
1201         core::map<u16, Peer*>::Iterator j;
1202         j = m_peers.getIterator();
1203         for(; j.atEnd() == false; j++)
1204         {
1205                 Peer *peer = j.getNode()->getValue();
1206                 
1207                 /*
1208                         Check peer timeout
1209                 */
1210                 peer->timeout_counter += dtime;
1211                 if(peer->timeout_counter > m_timeout)
1212                 {
1213                         PrintInfo(derr_con);
1214                         derr_con<<"RunTimeouts(): Peer "<<peer->id
1215                                         <<" has timed out."
1216                                         <<" (source=peer->timeout_counter)"
1217                                         <<std::endl;
1218                         // Add peer to the list
1219                         timeouted_peers.push_back(peer->id);
1220                         // Don't bother going through the buffers of this one
1221                         continue;
1222                 }
1223
1224                 float resend_timeout = peer->resend_timeout;
1225                 for(u16 i=0; i<CHANNEL_COUNT; i++)
1226                 {
1227                         core::list<BufferedPacket> timed_outs;
1228                         core::list<BufferedPacket>::Iterator j;
1229                         
1230                         Channel *channel = &peer->channels[i];
1231
1232                         // Remove timed out incomplete unreliable split packets
1233                         channel->incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout);
1234                         
1235                         // Increment reliable packet times
1236                         channel->outgoing_reliables.incrementTimeouts(dtime);
1237
1238                         // Check reliable packet total times, remove peer if
1239                         // over timeout.
1240                         if(channel->outgoing_reliables.anyTotaltimeReached(m_timeout))
1241                         {
1242                                 PrintInfo(derr_con);
1243                                 derr_con<<"RunTimeouts(): Peer "<<peer->id
1244                                                 <<" has timed out."
1245                                                 <<" (source=reliable packet totaltime)"
1246                                                 <<std::endl;
1247                                 // Add peer to the to-be-removed list
1248                                 timeouted_peers.push_back(peer->id);
1249                                 goto nextpeer;
1250                         }
1251
1252                         // Re-send timed out outgoing reliables
1253                         
1254                         timed_outs = channel->
1255                                         outgoing_reliables.getTimedOuts(resend_timeout);
1256
1257                         channel->outgoing_reliables.resetTimedOuts(resend_timeout);
1258
1259                         j = timed_outs.begin();
1260                         for(; j != timed_outs.end(); j++)
1261                         {
1262                                 u16 peer_id = readPeerId(*(j->data));
1263                                 u8 channel = readChannel(*(j->data));
1264                                 u16 seqnum = readU16(&(j->data[BASE_HEADER_SIZE+1]));
1265
1266                                 PrintInfo(derr_con);
1267                                 derr_con<<"RE-SENDING timed-out RELIABLE to ";
1268                                 j->address.print(&derr_con);
1269                                 derr_con<<"(t/o="<<resend_timeout<<"): "
1270                                                 <<"from_peer_id="<<peer_id
1271                                                 <<", channel="<<((int)channel&0xff)
1272                                                 <<", seqnum="<<seqnum
1273                                                 <<std::endl;
1274
1275                                 RawSend(*j);
1276
1277                                 // Enlarge avg_rtt and resend_timeout:
1278                                 // The rtt will be at least the timeout.
1279                                 // NOTE: This won't affect the timeout of the next
1280                                 // checked channel because it was cached.
1281                                 peer->reportRTT(resend_timeout);
1282                         }
1283                 }
1284                 
1285                 /*
1286                         Send pings
1287                 */
1288                 peer->ping_timer += dtime;
1289                 if(peer->ping_timer >= 5.0)
1290                 {
1291                         // Create and send PING packet
1292                         SharedBuffer<u8> data(2);
1293                         writeU8(&data[0], TYPE_CONTROL);
1294                         writeU8(&data[1], CONTROLTYPE_PING);
1295                         SendAsPacket(peer->id, 0, data, true);
1296
1297                         peer->ping_timer = 0.0;
1298                 }
1299                 
1300 nextpeer:
1301                 continue;
1302         }
1303
1304         // Remove timed out peers
1305         core::list<u16>::Iterator i = timeouted_peers.begin();
1306         for(; i != timeouted_peers.end(); i++)
1307         {
1308                 PrintInfo(derr_con);
1309                 derr_con<<"RunTimeouts(): Removing peer "<<(*i)<<std::endl;
1310                 deletePeer(*i, true);
1311         }
1312 }
1313
1314 Peer* Connection::GetPeer(u16 peer_id)
1315 {
1316         core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
1317
1318         if(node == NULL){
1319                 // Peer not found
1320                 throw PeerNotFoundException("GetPeer: Peer not found (possible timeout)");
1321         }
1322
1323         // Error checking
1324         assert(node->getValue()->id == peer_id);
1325
1326         return node->getValue();
1327 }
1328
1329 Peer* Connection::GetPeerNoEx(u16 peer_id)
1330 {
1331         core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
1332
1333         if(node == NULL){
1334                 return NULL;
1335         }
1336
1337         // Error checking
1338         assert(node->getValue()->id == peer_id);
1339
1340         return node->getValue();
1341 }
1342
1343 core::list<Peer*> Connection::GetPeers()
1344 {
1345         core::list<Peer*> list;
1346         core::map<u16, Peer*>::Iterator j;
1347         j = m_peers.getIterator();
1348         for(; j.atEnd() == false; j++)
1349         {
1350                 Peer *peer = j.getNode()->getValue();
1351                 list.push_back(peer);
1352         }
1353         return list;
1354 }
1355
1356 bool Connection::deletePeer(u16 peer_id, bool timeout)
1357 {
1358         if(m_peers.find(peer_id) == NULL)
1359                 return false;
1360         m_peerhandler->deletingPeer(m_peers[peer_id], timeout);
1361         delete m_peers[peer_id];
1362         m_peers.remove(peer_id);
1363         return true;
1364 }
1365
1366 void Connection::PrintInfo(std::ostream &out)
1367 {
1368         out<<m_socket.GetHandle();
1369         out<<" ";
1370         out<<"con "<<m_peer_id<<": ";
1371         for(s16 i=0; i<(s16)m_indentation-1; i++)
1372                 out<<"  ";
1373 }
1374
1375 void Connection::PrintInfo()
1376 {
1377         PrintInfo(dout_con);
1378 }
1379
1380 } // namespace
1381