Middle mouse click will move 10 items around at a time.
[oweals/minetest.git] / src / connection.cpp
1 /*
2 Minetest-c55
3 Copyright (C) 2010 celeron55, Perttu Ahola <celeron55@gmail.com>
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License along
16 with this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20 #include "connection.h"
21 #include "main.h"
22 #include "serialization.h"
23
24 namespace con
25 {
26
27 BufferedPacket makePacket(Address &address, u8 *data, u32 datasize,
28                 u32 protocol_id, u16 sender_peer_id, u8 channel)
29 {
30         u32 packet_size = datasize + BASE_HEADER_SIZE;
31         BufferedPacket p(packet_size);
32         p.address = address;
33
34         writeU32(&p.data[0], protocol_id);
35         writeU16(&p.data[4], sender_peer_id);
36         writeU8(&p.data[6], channel);
37
38         memcpy(&p.data[BASE_HEADER_SIZE], data, datasize);
39
40         return p;
41 }
42
43 BufferedPacket makePacket(Address &address, SharedBuffer<u8> &data,
44                 u32 protocol_id, u16 sender_peer_id, u8 channel)
45 {
46         return makePacket(address, *data, data.getSize(),
47                         protocol_id, sender_peer_id, channel);
48 }
49
50 SharedBuffer<u8> makeOriginalPacket(
51                 SharedBuffer<u8> data)
52 {
53         u32 header_size = 1;
54         u32 packet_size = data.getSize() + header_size;
55         SharedBuffer<u8> b(packet_size);
56
57         writeU8(&b[0], TYPE_ORIGINAL);
58
59         memcpy(&b[header_size], *data, data.getSize());
60
61         return b;
62 }
63
64 core::list<SharedBuffer<u8> > makeSplitPacket(
65                 SharedBuffer<u8> data,
66                 u32 chunksize_max,
67                 u16 seqnum)
68 {
69         // Chunk packets, containing the TYPE_SPLIT header
70         core::list<SharedBuffer<u8> > chunks;
71         
72         u32 chunk_header_size = 7;
73         u32 maximum_data_size = chunksize_max - chunk_header_size;
74         u32 start = 0;
75         u32 end = 0;
76         u32 chunk_num = 0;
77         do{
78                 end = start + maximum_data_size - 1;
79                 if(end > data.getSize() - 1)
80                         end = data.getSize() - 1;
81                 
82                 u32 payload_size = end - start + 1;
83                 u32 packet_size = chunk_header_size + payload_size;
84
85                 SharedBuffer<u8> chunk(packet_size);
86                 
87                 writeU8(&chunk[0], TYPE_SPLIT);
88                 writeU16(&chunk[1], seqnum);
89                 // [3] u16 chunk_count is written at next stage
90                 writeU16(&chunk[5], chunk_num);
91                 memcpy(&chunk[chunk_header_size], &data[start], payload_size);
92
93                 chunks.push_back(chunk);
94                 
95                 start = end + 1;
96                 chunk_num++;
97         }
98         while(end != data.getSize() - 1);
99
100         u16 chunk_count = chunks.getSize();
101
102         core::list<SharedBuffer<u8> >::Iterator i = chunks.begin();
103         for(; i != chunks.end(); i++)
104         {
105                 // Write chunk_count
106                 writeU16(&((*i)[3]), chunk_count);
107         }
108
109         return chunks;
110 }
111
112 core::list<SharedBuffer<u8> > makeAutoSplitPacket(
113                 SharedBuffer<u8> data,
114                 u32 chunksize_max,
115                 u16 &split_seqnum)
116 {
117         u32 original_header_size = 1;
118         core::list<SharedBuffer<u8> > list;
119         if(data.getSize() + original_header_size > chunksize_max)
120         {
121                 list = makeSplitPacket(data, chunksize_max, split_seqnum);
122                 split_seqnum++;
123                 return list;
124         }
125         else
126         {
127                 list.push_back(makeOriginalPacket(data));
128         }
129         return list;
130 }
131
132 SharedBuffer<u8> makeReliablePacket(
133                 SharedBuffer<u8> data,
134                 u16 seqnum)
135 {
136         /*dstream<<"BEGIN SharedBuffer<u8> makeReliablePacket()"<<std::endl;
137         dstream<<"data.getSize()="<<data.getSize()<<", data[0]="
138                         <<((unsigned int)data[0]&0xff)<<std::endl;*/
139         u32 header_size = 3;
140         u32 packet_size = data.getSize() + header_size;
141         SharedBuffer<u8> b(packet_size);
142
143         writeU8(&b[0], TYPE_RELIABLE);
144         writeU16(&b[1], seqnum);
145
146         memcpy(&b[header_size], *data, data.getSize());
147
148         /*dstream<<"data.getSize()="<<data.getSize()<<", data[0]="
149                         <<((unsigned int)data[0]&0xff)<<std::endl;*/
150         //dstream<<"END SharedBuffer<u8> makeReliablePacket()"<<std::endl;
151         return b;
152 }
153
154 /*
155         ReliablePacketBuffer
156 */
157
158 void ReliablePacketBuffer::print()
159 {
160         core::list<BufferedPacket>::Iterator i;
161         i = m_list.begin();
162         for(; i != m_list.end(); i++)
163         {
164                 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
165                 dout_con<<s<<" ";
166         }
167 }
168 bool ReliablePacketBuffer::empty()
169 {
170         return m_list.empty();
171 }
172 u32 ReliablePacketBuffer::size()
173 {
174         return m_list.getSize();
175 }
176 RPBSearchResult ReliablePacketBuffer::findPacket(u16 seqnum)
177 {
178         core::list<BufferedPacket>::Iterator i;
179         i = m_list.begin();
180         for(; i != m_list.end(); i++)
181         {
182                 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
183                 /*dout_con<<"findPacket(): finding seqnum="<<seqnum
184                                 <<", comparing to s="<<s<<std::endl;*/
185                 if(s == seqnum)
186                         break;
187         }
188         return i;
189 }
190 RPBSearchResult ReliablePacketBuffer::notFound()
191 {
192         return m_list.end();
193 }
194 u16 ReliablePacketBuffer::getFirstSeqnum()
195 {
196         if(empty())
197                 throw NotFoundException("Buffer is empty");
198         BufferedPacket p = *m_list.begin();
199         return readU16(&p.data[BASE_HEADER_SIZE+1]);
200 }
201 BufferedPacket ReliablePacketBuffer::popFirst()
202 {
203         if(empty())
204                 throw NotFoundException("Buffer is empty");
205         BufferedPacket p = *m_list.begin();
206         core::list<BufferedPacket>::Iterator i = m_list.begin();
207         m_list.erase(i);
208         return p;
209 }
210 BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum)
211 {
212         RPBSearchResult r = findPacket(seqnum);
213         if(r == notFound()){
214                 dout_con<<"Not found"<<std::endl;
215                 throw NotFoundException("seqnum not found in buffer");
216         }
217         BufferedPacket p = *r;
218         m_list.erase(r);
219         return p;
220 }
221 void ReliablePacketBuffer::insert(BufferedPacket &p)
222 {
223         assert(p.data.getSize() >= BASE_HEADER_SIZE+3);
224         u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
225         assert(type == TYPE_RELIABLE);
226         u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
227
228         // Find the right place for the packet and insert it there
229
230         // If list is empty, just add it
231         if(m_list.empty())
232         {
233                 m_list.push_back(p);
234                 // Done.
235                 return;
236         }
237         // Otherwise find the right place
238         core::list<BufferedPacket>::Iterator i;
239         i = m_list.begin();
240         // Find the first packet in the list which has a higher seqnum
241         for(; i != m_list.end(); i++){
242                 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
243                 if(s == seqnum){
244                         throw AlreadyExistsException("Same seqnum in list");
245                 }
246                 if(seqnum_higher(s, seqnum)){
247                         break;
248                 }
249         }
250         // If we're at the end of the list, add the packet to the
251         // end of the list
252         if(i == m_list.end())
253         {
254                 m_list.push_back(p);
255                 // Done.
256                 return;
257         }
258         // Insert before i
259         m_list.insert_before(i, p);
260 }
261
262 void ReliablePacketBuffer::incrementTimeouts(float dtime)
263 {
264         core::list<BufferedPacket>::Iterator i;
265         i = m_list.begin();
266         for(; i != m_list.end(); i++){
267                 i->time += dtime;
268                 i->totaltime += dtime;
269         }
270 }
271
272 void ReliablePacketBuffer::resetTimedOuts(float timeout)
273 {
274         core::list<BufferedPacket>::Iterator i;
275         i = m_list.begin();
276         for(; i != m_list.end(); i++){
277                 if(i->time >= timeout)
278                         i->time = 0.0;
279         }
280 }
281
282 bool ReliablePacketBuffer::anyTotaltimeReached(float timeout)
283 {
284         core::list<BufferedPacket>::Iterator i;
285         i = m_list.begin();
286         for(; i != m_list.end(); i++){
287                 if(i->totaltime >= timeout)
288                         return true;
289         }
290         return false;
291 }
292
293 core::list<BufferedPacket> ReliablePacketBuffer::getTimedOuts(float timeout)
294 {
295         core::list<BufferedPacket> timed_outs;
296         core::list<BufferedPacket>::Iterator i;
297         i = m_list.begin();
298         for(; i != m_list.end(); i++)
299         {
300                 if(i->time >= timeout)
301                         timed_outs.push_back(*i);
302         }
303         return timed_outs;
304 }
305
306 /*
307         IncomingSplitBuffer
308 */
309
310 IncomingSplitBuffer::~IncomingSplitBuffer()
311 {
312         core::map<u16, IncomingSplitPacket*>::Iterator i;
313         i = m_buf.getIterator();
314         for(; i.atEnd() == false; i++)
315         {
316                 delete i.getNode()->getValue();
317         }
318 }
319 /*
320         This will throw a GotSplitPacketException when a full
321         split packet is constructed.
322 */
323 SharedBuffer<u8> IncomingSplitBuffer::insert(BufferedPacket &p, bool reliable)
324 {
325         u32 headersize = BASE_HEADER_SIZE + 7;
326         assert(p.data.getSize() >= headersize);
327         u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
328         assert(type == TYPE_SPLIT);
329         u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
330         u16 chunk_count = readU16(&p.data[BASE_HEADER_SIZE+3]);
331         u16 chunk_num = readU16(&p.data[BASE_HEADER_SIZE+5]);
332
333         // Add if doesn't exist
334         if(m_buf.find(seqnum) == NULL)
335         {
336                 IncomingSplitPacket *sp = new IncomingSplitPacket();
337                 sp->chunk_count = chunk_count;
338                 sp->reliable = reliable;
339                 m_buf[seqnum] = sp;
340         }
341         
342         IncomingSplitPacket *sp = m_buf[seqnum];
343         
344         // TODO: These errors should be thrown or something? Dunno.
345         if(chunk_count != sp->chunk_count)
346                 derr_con<<"Connection: WARNING: chunk_count="<<chunk_count
347                                 <<" != sp->chunk_count="<<sp->chunk_count
348                                 <<std::endl;
349         if(reliable != sp->reliable)
350                 derr_con<<"Connection: WARNING: reliable="<<reliable
351                                 <<" != sp->reliable="<<sp->reliable
352                                 <<std::endl;
353
354         // If chunk already exists, cancel
355         if(sp->chunks.find(chunk_num) != NULL)
356                 throw AlreadyExistsException("Chunk already in buffer");
357         
358         // Cut chunk data out of packet
359         u32 chunkdatasize = p.data.getSize() - headersize;
360         SharedBuffer<u8> chunkdata(chunkdatasize);
361         memcpy(*chunkdata, &(p.data[headersize]), chunkdatasize);
362         
363         // Set chunk data in buffer
364         sp->chunks[chunk_num] = chunkdata;
365         
366         // If not all chunks are received, return empty buffer
367         if(sp->allReceived() == false)
368                 return SharedBuffer<u8>();
369
370         // Calculate total size
371         u32 totalsize = 0;
372         core::map<u16, SharedBuffer<u8> >::Iterator i;
373         i = sp->chunks.getIterator();
374         for(; i.atEnd() == false; i++)
375         {
376                 totalsize += i.getNode()->getValue().getSize();
377         }
378         
379         SharedBuffer<u8> fulldata(totalsize);
380
381         // Copy chunks to data buffer
382         u32 start = 0;
383         for(u32 chunk_i=0; chunk_i<sp->chunk_count;
384                         chunk_i++)
385         {
386                 SharedBuffer<u8> buf = sp->chunks[chunk_i];
387                 u16 chunkdatasize = buf.getSize();
388                 memcpy(&fulldata[start], *buf, chunkdatasize);
389                 start += chunkdatasize;;
390         }
391
392         // Remove sp from buffer
393         m_buf.remove(seqnum);
394         delete sp;
395
396         return fulldata;
397 }
398 void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout)
399 {
400         core::list<u16> remove_queue;
401         core::map<u16, IncomingSplitPacket*>::Iterator i;
402         i = m_buf.getIterator();
403         for(; i.atEnd() == false; i++)
404         {
405                 IncomingSplitPacket *p = i.getNode()->getValue();
406                 // Reliable ones are not removed by timeout
407                 if(p->reliable == true)
408                         continue;
409                 p->time += dtime;
410                 if(p->time >= timeout)
411                         remove_queue.push_back(i.getNode()->getKey());
412         }
413         core::list<u16>::Iterator j;
414         j = remove_queue.begin();
415         for(; j != remove_queue.end(); j++)
416         {
417                 dout_con<<"NOTE: Removing timed out unreliable split packet"
418                                 <<std::endl;
419                 delete m_buf[*j];
420                 m_buf.remove(*j);
421         }
422 }
423
424 /*
425         Channel
426 */
427
428 Channel::Channel()
429 {
430         next_outgoing_seqnum = SEQNUM_INITIAL;
431         next_incoming_seqnum = SEQNUM_INITIAL;
432         next_outgoing_split_seqnum = SEQNUM_INITIAL;
433 }
434 Channel::~Channel()
435 {
436 }
437
438 /*
439         Peer
440 */
441
442 Peer::Peer(u16 a_id, Address a_address)
443 {
444         id = a_id;
445         address = a_address;
446         timeout_counter = 0.0;
447         //resend_timeout = RESEND_TIMEOUT_MINIMUM;
448         ping_timer = 0.0;
449         resend_timeout = 0.5;
450         avg_rtt = -1.0;
451         has_sent_with_id = false;
452 }
453 Peer::~Peer()
454 {
455 }
456
457 void Peer::reportRTT(float rtt)
458 {
459         if(rtt < -0.999)
460         {}
461         else if(avg_rtt < 0.0)
462                 avg_rtt = rtt;
463         else
464                 avg_rtt = rtt * 0.1 + avg_rtt * 0.9;
465         
466         // Calculate resend_timeout
467
468         /*int reliable_count = 0;
469         for(int i=0; i<CHANNEL_COUNT; i++)
470         {
471                 reliable_count += channels[i].outgoing_reliables.size();
472         }
473         float timeout = avg_rtt * RESEND_TIMEOUT_FACTOR
474                         * ((float)reliable_count * 1);*/
475         
476         float timeout = avg_rtt * RESEND_TIMEOUT_FACTOR;
477         if(timeout < RESEND_TIMEOUT_MIN)
478                 timeout = RESEND_TIMEOUT_MIN;
479         if(timeout > RESEND_TIMEOUT_MAX)
480                 timeout = RESEND_TIMEOUT_MAX;
481         resend_timeout = timeout;
482 }
483                                 
484 /*
485         Connection
486 */
487
488 Connection::Connection(
489         u32 protocol_id,
490         u32 max_packet_size,
491         float timeout,
492         PeerHandler *peerhandler
493 )
494 {
495         assert(peerhandler != NULL);
496
497         m_protocol_id = protocol_id;
498         m_max_packet_size = max_packet_size;
499         m_timeout = timeout;
500         m_peer_id = PEER_ID_INEXISTENT;
501         //m_waiting_new_peer_id = false;
502         m_indentation = 0;
503         m_peerhandler = peerhandler;
504 }
505
506 Connection::~Connection()
507 {
508         // Clear peers
509         core::map<u16, Peer*>::Iterator j;
510         j = m_peers.getIterator();
511         for(; j.atEnd() == false; j++)
512         {
513                 Peer *peer = j.getNode()->getValue();
514                 delete peer;
515         }
516 }
517
518 void Connection::Serve(unsigned short port)
519 {
520         m_socket.Bind(port);
521         m_peer_id = PEER_ID_SERVER;
522 }
523
524 void Connection::Connect(Address address)
525 {
526         core::map<u16, Peer*>::Node *node = m_peers.find(PEER_ID_SERVER);
527         if(node != NULL){
528                 throw ConnectionException("Already connected to a server");
529         }
530
531         Peer *peer = new Peer(PEER_ID_SERVER, address);
532         m_peers.insert(peer->id, peer);
533         m_peerhandler->peerAdded(peer);
534         
535         m_socket.Bind(0);
536         
537         // Send a dummy packet to server with peer_id = PEER_ID_INEXISTENT
538         m_peer_id = PEER_ID_INEXISTENT;
539         SharedBuffer<u8> data(0);
540         Send(PEER_ID_SERVER, 0, data, true);
541
542         //m_waiting_new_peer_id = true;
543 }
544
545 void Connection::Disconnect()
546 {
547         // Create and send DISCO packet
548         SharedBuffer<u8> data(2);
549         writeU8(&data[0], TYPE_CONTROL);
550         writeU8(&data[1], CONTROLTYPE_DISCO);
551         
552         // Send to all
553         core::map<u16, Peer*>::Iterator j;
554         j = m_peers.getIterator();
555         for(; j.atEnd() == false; j++)
556         {
557                 Peer *peer = j.getNode()->getValue();
558                 SendAsPacket(peer->id, 0, data, false);
559         }
560 }
561
562 bool Connection::Connected()
563 {
564         if(m_peers.size() != 1)
565                 return false;
566                 
567         core::map<u16, Peer*>::Node *node = m_peers.find(PEER_ID_SERVER);
568         if(node == NULL)
569                 return false;
570         
571         if(m_peer_id == PEER_ID_INEXISTENT)
572                 return false;
573         
574         return true;
575 }
576
577 SharedBuffer<u8> Channel::ProcessPacket(
578                 SharedBuffer<u8> packetdata,
579                 Connection *con,
580                 u16 peer_id,
581                 u8 channelnum,
582                 bool reliable)
583 {
584         IndentationRaiser iraiser(&(con->m_indentation));
585
586         if(packetdata.getSize() < 1)
587                 throw InvalidIncomingDataException("packetdata.getSize() < 1");
588
589         u8 type = readU8(&packetdata[0]);
590         
591         if(type == TYPE_CONTROL)
592         {
593                 if(packetdata.getSize() < 2)
594                         throw InvalidIncomingDataException("packetdata.getSize() < 2");
595
596                 u8 controltype = readU8(&packetdata[1]);
597
598                 if(controltype == CONTROLTYPE_ACK)
599                 {
600                         if(packetdata.getSize() < 4)
601                                 throw InvalidIncomingDataException
602                                                 ("packetdata.getSize() < 4 (ACK header size)");
603
604                         u16 seqnum = readU16(&packetdata[2]);
605                         con->PrintInfo();
606                         dout_con<<"Got CONTROLTYPE_ACK: channelnum="
607                                         <<((int)channelnum&0xff)<<", peer_id="<<peer_id
608                                         <<", seqnum="<<seqnum<<std::endl;
609
610                         try{
611                                 BufferedPacket p = outgoing_reliables.popSeqnum(seqnum);
612                                 // Get round trip time
613                                 float rtt = p.totaltime;
614
615                                 // Let peer calculate stuff according to it
616                                 // (avg_rtt and resend_timeout)
617                                 Peer *peer = con->GetPeer(peer_id);
618                                 peer->reportRTT(rtt);
619
620                                 //con->PrintInfo(dout_con);
621                                 //dout_con<<"RTT = "<<rtt<<std::endl;
622
623                                 /*dout_con<<"OUTGOING: ";
624                                 con->PrintInfo();
625                                 outgoing_reliables.print();
626                                 dout_con<<std::endl;*/
627                         }
628                         catch(NotFoundException &e){
629                                 con->PrintInfo(derr_con);
630                                 derr_con<<"WARNING: ACKed packet not "
631                                                 "in outgoing queue"
632                                                 <<std::endl;
633                         }
634
635                         throw ProcessedSilentlyException("Got an ACK");
636                 }
637                 else if(controltype == CONTROLTYPE_SET_PEER_ID)
638                 {
639                         if(packetdata.getSize() < 4)
640                                 throw InvalidIncomingDataException
641                                                 ("packetdata.getSize() < 4 (SET_PEER_ID header size)");
642                         u16 peer_id_new = readU16(&packetdata[2]);
643                         con->PrintInfo();
644                         dout_con<<"Got new peer id: "<<peer_id_new<<"... "<<std::endl;
645
646                         if(con->GetPeerID() != PEER_ID_INEXISTENT)
647                         {
648                                 con->PrintInfo(derr_con);
649                                 derr_con<<"WARNING: Not changing"
650                                                 " existing peer id."<<std::endl;
651                         }
652                         else
653                         {
654                                 dout_con<<"changing."<<std::endl;
655                                 con->SetPeerID(peer_id_new);
656                         }
657                         throw ProcessedSilentlyException("Got a SET_PEER_ID");
658                 }
659                 else if(controltype == CONTROLTYPE_PING)
660                 {
661                         // Just ignore it, the incoming data already reset
662                         // the timeout counter
663                         con->PrintInfo();
664                         dout_con<<"PING"<<std::endl;
665                         throw ProcessedSilentlyException("Got a PING");
666                 }
667                 else if(controltype == CONTROLTYPE_DISCO)
668                 {
669                         // Just ignore it, the incoming data already reset
670                         // the timeout counter
671                         con->PrintInfo();
672                         dout_con<<"DISCO: Removing peer "<<(peer_id)<<std::endl;
673                         
674                         if(con->deletePeer(peer_id, false) == false)
675                         {
676                                 con->PrintInfo(derr_con);
677                                 derr_con<<"DISCO: Peer not found"<<std::endl;
678                         }
679
680                         throw ProcessedSilentlyException("Got a DISCO");
681                 }
682                 else{
683                         con->PrintInfo(derr_con);
684                         derr_con<<"INVALID TYPE_CONTROL: invalid controltype="
685                                         <<((int)controltype&0xff)<<std::endl;
686                         throw InvalidIncomingDataException("Invalid control type");
687                 }
688         }
689         else if(type == TYPE_ORIGINAL)
690         {
691                 if(packetdata.getSize() < ORIGINAL_HEADER_SIZE)
692                         throw InvalidIncomingDataException
693                                         ("packetdata.getSize() < ORIGINAL_HEADER_SIZE");
694                 con->PrintInfo();
695                 dout_con<<"RETURNING TYPE_ORIGINAL to user"
696                                 <<std::endl;
697                 // Get the inside packet out and return it
698                 SharedBuffer<u8> payload(packetdata.getSize() - ORIGINAL_HEADER_SIZE);
699                 memcpy(*payload, &packetdata[ORIGINAL_HEADER_SIZE], payload.getSize());
700                 return payload;
701         }
702         else if(type == TYPE_SPLIT)
703         {
704                 // We have to create a packet again for buffering
705                 // This isn't actually too bad an idea.
706                 BufferedPacket packet = makePacket(
707                                 con->GetPeer(peer_id)->address,
708                                 packetdata,
709                                 con->GetProtocolID(),
710                                 peer_id,
711                                 channelnum);
712                 // Buffer the packet
713                 SharedBuffer<u8> data = incoming_splits.insert(packet, reliable);
714                 if(data.getSize() != 0)
715                 {
716                         con->PrintInfo();
717                         dout_con<<"RETURNING TYPE_SPLIT: Constructed full data, "
718                                         <<"size="<<data.getSize()<<std::endl;
719                         return data;
720                 }
721                 con->PrintInfo();
722                 dout_con<<"BUFFERED TYPE_SPLIT"<<std::endl;
723                 throw ProcessedSilentlyException("Buffered a split packet chunk");
724         }
725         else if(type == TYPE_RELIABLE)
726         {
727                 // Recursive reliable packets not allowed
728                 assert(reliable == false);
729
730                 if(packetdata.getSize() < RELIABLE_HEADER_SIZE)
731                         throw InvalidIncomingDataException
732                                         ("packetdata.getSize() < RELIABLE_HEADER_SIZE");
733
734                 u16 seqnum = readU16(&packetdata[1]);
735
736                 bool is_future_packet = seqnum_higher(seqnum, next_incoming_seqnum);
737                 bool is_old_packet = seqnum_higher(next_incoming_seqnum, seqnum);
738                 
739                 con->PrintInfo();
740                 if(is_future_packet)
741                         dout_con<<"BUFFERING";
742                 else if(is_old_packet)
743                         dout_con<<"OLD";
744                 else
745                         dout_con<<"RECUR";
746                 dout_con<<" TYPE_RELIABLE seqnum="<<seqnum
747                                 <<" next="<<next_incoming_seqnum;
748                 dout_con<<" [sending CONTROLTYPE_ACK"
749                                 " to peer_id="<<peer_id<<"]";
750                 dout_con<<std::endl;
751                 
752                 //DEBUG
753                 //assert(incoming_reliables.size() < 100);
754
755                 // Send a CONTROLTYPE_ACK
756                 SharedBuffer<u8> reply(4);
757                 writeU8(&reply[0], TYPE_CONTROL);
758                 writeU8(&reply[1], CONTROLTYPE_ACK);
759                 writeU16(&reply[2], seqnum);
760                 con->SendAsPacket(peer_id, channelnum, reply, false);
761
762                 //if(seqnum_higher(seqnum, next_incoming_seqnum))
763                 if(is_future_packet)
764                 {
765                         /*con->PrintInfo();
766                         dout_con<<"Buffering reliable packet (seqnum="
767                                         <<seqnum<<")"<<std::endl;*/
768                         
769                         // This one comes later, buffer it.
770                         // Actually we have to make a packet to buffer one.
771                         // Well, we have all the ingredients, so just do it.
772                         BufferedPacket packet = makePacket(
773                                         con->GetPeer(peer_id)->address,
774                                         packetdata,
775                                         con->GetProtocolID(),
776                                         peer_id,
777                                         channelnum);
778                         try{
779                                 incoming_reliables.insert(packet);
780                                 
781                                 /*con->PrintInfo();
782                                 dout_con<<"INCOMING: ";
783                                 incoming_reliables.print();
784                                 dout_con<<std::endl;*/
785                         }
786                         catch(AlreadyExistsException &e)
787                         {
788                         }
789
790                         throw ProcessedSilentlyException("Buffered future reliable packet");
791                 }
792                 //else if(seqnum_higher(next_incoming_seqnum, seqnum))
793                 else if(is_old_packet)
794                 {
795                         // An old packet, dump it
796                         throw InvalidIncomingDataException("Got an old reliable packet");
797                 }
798
799                 next_incoming_seqnum++;
800
801                 // Get out the inside packet and re-process it
802                 SharedBuffer<u8> payload(packetdata.getSize() - RELIABLE_HEADER_SIZE);
803                 memcpy(*payload, &packetdata[RELIABLE_HEADER_SIZE], payload.getSize());
804
805                 return ProcessPacket(payload, con, peer_id, channelnum, true);
806         }
807         else
808         {
809                 con->PrintInfo(derr_con);
810                 derr_con<<"Got invalid type="<<((int)type&0xff)<<std::endl;
811                 throw InvalidIncomingDataException("Invalid packet type");
812         }
813         
814         // We should never get here.
815         // If you get here, add an exception or a return to some of the
816         // above conditionals.
817         assert(0);
818         throw BaseException("Error in Channel::ProcessPacket()");
819 }
820
821 SharedBuffer<u8> Channel::CheckIncomingBuffers(Connection *con,
822                 u16 &peer_id)
823 {
824         u16 firstseqnum = 0;
825         // Clear old packets from start of buffer
826         try{
827         for(;;){
828                 firstseqnum = incoming_reliables.getFirstSeqnum();
829                 if(seqnum_higher(next_incoming_seqnum, firstseqnum))
830                         incoming_reliables.popFirst();
831                 else
832                         break;
833         }
834         // This happens if all packets are old
835         }catch(con::NotFoundException)
836         {}
837         
838         if(incoming_reliables.empty() == false)
839         {
840                 if(firstseqnum == next_incoming_seqnum)
841                 {
842                         BufferedPacket p = incoming_reliables.popFirst();
843                         
844                         peer_id = readPeerId(*p.data);
845                         u8 channelnum = readChannel(*p.data);
846                         u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
847
848                         con->PrintInfo();
849                         dout_con<<"UNBUFFERING TYPE_RELIABLE"
850                                         <<" seqnum="<<seqnum
851                                         <<" peer_id="<<peer_id
852                                         <<" channel="<<((int)channelnum&0xff)
853                                         <<std::endl;
854
855                         next_incoming_seqnum++;
856                         
857                         u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
858                         // Get out the inside packet and re-process it
859                         SharedBuffer<u8> payload(p.data.getSize() - headers_size);
860                         memcpy(*payload, &p.data[headers_size], payload.getSize());
861
862                         return ProcessPacket(payload, con, peer_id, channelnum, true);
863                 }
864         }
865                 
866         throw NoIncomingDataException("No relevant data in buffers");
867 }
868
869 SharedBuffer<u8> Connection::GetFromBuffers(u16 &peer_id)
870 {
871         core::map<u16, Peer*>::Iterator j;
872         j = m_peers.getIterator();
873         for(; j.atEnd() == false; j++)
874         {
875                 Peer *peer = j.getNode()->getValue();
876                 for(u16 i=0; i<CHANNEL_COUNT; i++)
877                 {
878                         Channel *channel = &peer->channels[i];
879                         try{
880                                 SharedBuffer<u8> resultdata = channel->CheckIncomingBuffers
881                                                 (this, peer_id);
882
883                                 return resultdata;
884                         }
885                         catch(NoIncomingDataException &e)
886                         {
887                         }
888                         catch(InvalidIncomingDataException &e)
889                         {
890                         }
891                         catch(ProcessedSilentlyException &e)
892                         {
893                         }
894                 }
895         }
896         throw NoIncomingDataException("No relevant data in buffers");
897 }
898
899 u32 Connection::Receive(u16 &peer_id, u8 *data, u32 datasize)
900 {
901         /*
902                 Receive a packet from the network
903         */
904         
905         // TODO: We can not know how many layers of header there are.
906         // For now, just assume there are no other than the base headers.
907         u32 packet_maxsize = datasize + BASE_HEADER_SIZE;
908         Buffer<u8> packetdata(packet_maxsize);
909         
910         for(;;)
911         {
912         try
913         {
914                 /*
915                         Check if some buffer has relevant data
916                 */
917                 try{
918                         SharedBuffer<u8> resultdata = GetFromBuffers(peer_id);
919
920                         if(datasize < resultdata.getSize())
921                                 throw InvalidIncomingDataException
922                                                 ("Buffer too small for received data");
923                                 
924                         memcpy(data, *resultdata, resultdata.getSize());
925                         return resultdata.getSize();
926                 }
927                 catch(NoIncomingDataException &e)
928                 {
929                 }
930         
931                 Address sender;
932
933                 s32 received_size = m_socket.Receive(sender, *packetdata, packet_maxsize);
934
935                 if(received_size < 0)
936                         throw NoIncomingDataException("No incoming data");
937                 if(received_size < BASE_HEADER_SIZE)
938                         throw InvalidIncomingDataException("No full header received");
939                 if(readU32(&packetdata[0]) != m_protocol_id)
940                         throw InvalidIncomingDataException("Invalid protocol id");
941                 
942                 peer_id = readPeerId(*packetdata);
943                 u8 channelnum = readChannel(*packetdata);
944                 if(channelnum > CHANNEL_COUNT-1){
945                         PrintInfo(derr_con);
946                         derr_con<<"Receive(): Invalid channel "<<channelnum<<std::endl;
947                         throw InvalidIncomingDataException("Channel doesn't exist");
948                 }
949
950                 if(peer_id == PEER_ID_INEXISTENT)
951                 {
952                         /*
953                                 Somebody is trying to send stuff to us with no peer id.
954                                 
955                                 Check if the same address and port was added to our peer
956                                 list before.
957                                 Allow only entries that have has_sent_with_id==false.
958                         */
959
960                         core::map<u16, Peer*>::Iterator j;
961                         j = m_peers.getIterator();
962                         for(; j.atEnd() == false; j++)
963                         {
964                                 Peer *peer = j.getNode()->getValue();
965                                 if(peer->has_sent_with_id)
966                                         continue;
967                                 if(peer->address == sender)
968                                         break;
969                         }
970                         
971                         /*
972                                 If no peer was found with the same address and port,
973                                 we shall assume it is a new peer and create an entry.
974                         */
975                         if(j.atEnd())
976                         {
977                                 // Pass on to adding the peer
978                         }
979                         // Else: A peer was found.
980                         else
981                         {
982                                 Peer *peer = j.getNode()->getValue();
983                                 peer_id = peer->id;
984                                 PrintInfo(derr_con);
985                                 derr_con<<"WARNING: Assuming unknown peer to be "
986                                                 <<"peer_id="<<peer_id<<std::endl;
987                         }
988                 }
989                 
990                 /*
991                         The peer was not found in our lists. Add it.
992                 */
993                 if(peer_id == PEER_ID_INEXISTENT)
994                 {
995                         // Somebody wants to make a new connection
996
997                         // Get a unique peer id (2 or higher)
998                         u16 peer_id_new = 2;
999                         /*
1000                                 Find an unused peer id
1001                         */
1002                         for(;;)
1003                         {
1004                                 // Check if exists
1005                                 if(m_peers.find(peer_id_new) == NULL)
1006                                         break;
1007                                 // Check for overflow
1008                                 if(peer_id_new == 65535)
1009                                         throw ConnectionException
1010                                                 ("Connection ran out of peer ids");
1011                                 peer_id_new++;
1012                         }
1013
1014                         PrintInfo();
1015                         dout_con<<"Receive(): Got a packet with peer_id=PEER_ID_INEXISTENT,"
1016                                         " giving peer_id="<<peer_id_new<<std::endl;
1017
1018                         // Create a peer
1019                         Peer *peer = new Peer(peer_id_new, sender);
1020                         m_peers.insert(peer->id, peer);
1021                         m_peerhandler->peerAdded(peer);
1022                         
1023                         // Create CONTROL packet to tell the peer id to the new peer.
1024                         SharedBuffer<u8> reply(4);
1025                         writeU8(&reply[0], TYPE_CONTROL);
1026                         writeU8(&reply[1], CONTROLTYPE_SET_PEER_ID);
1027                         writeU16(&reply[2], peer_id_new);
1028                         SendAsPacket(peer_id_new, 0, reply, true);
1029                         
1030                         // We're now talking to a valid peer_id
1031                         peer_id = peer_id_new;
1032
1033                         // Go on and process whatever it sent
1034                 }
1035
1036                 core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
1037
1038                 if(node == NULL)
1039                 {
1040                         // Peer not found
1041                         // This means that the peer id of the sender is not PEER_ID_INEXISTENT
1042                         // and it is invalid.
1043                         PrintInfo(derr_con);
1044                         derr_con<<"Receive(): Peer not found"<<std::endl;
1045                         throw InvalidIncomingDataException("Peer not found (possible timeout)");
1046                 }
1047
1048                 Peer *peer = node->getValue();
1049
1050                 // Validate peer address
1051                 if(peer->address != sender)
1052                 {
1053                         PrintInfo(derr_con);
1054                         derr_con<<"Peer "<<peer_id<<" sending from different address."
1055                                         " Ignoring."<<std::endl;
1056                         throw InvalidIncomingDataException
1057                                         ("Peer sending from different address");
1058                         /*// If there is more data, receive again
1059                         if(m_socket.WaitData(0) == true)
1060                                 continue;
1061                         throw NoIncomingDataException("No incoming data (2)");*/
1062                 }
1063                 
1064                 peer->timeout_counter = 0.0;
1065
1066                 Channel *channel = &(peer->channels[channelnum]);
1067                 
1068                 // Throw the received packet to channel->processPacket()
1069
1070                 // Make a new SharedBuffer from the data without the base headers
1071                 SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE);
1072                 memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE],
1073                                 strippeddata.getSize());
1074                 
1075                 try{
1076                         // Process it (the result is some data with no headers made by us)
1077                         SharedBuffer<u8> resultdata = channel->ProcessPacket
1078                                         (strippeddata, this, peer_id, channelnum);
1079                         
1080                         PrintInfo();
1081                         dout_con<<"ProcessPacket returned data of size "
1082                                         <<resultdata.getSize()<<std::endl;
1083                         
1084                         if(datasize < resultdata.getSize())
1085                                 throw InvalidIncomingDataException
1086                                                 ("Buffer too small for received data");
1087                         
1088                         memcpy(data, *resultdata, resultdata.getSize());
1089                         return resultdata.getSize();
1090                 }
1091                 catch(ProcessedSilentlyException &e)
1092                 {
1093                         // If there is more data, receive again
1094                         if(m_socket.WaitData(0) == true)
1095                                 continue;
1096                 }
1097                 throw NoIncomingDataException("No incoming data (2)");
1098         } // try
1099         catch(InvalidIncomingDataException &e)
1100         {
1101                 // If there is more data, receive again
1102                 if(m_socket.WaitData(0) == true)
1103                         continue;
1104         }
1105         } // for
1106 }
1107
1108 void Connection::SendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable)
1109 {
1110         core::map<u16, Peer*>::Iterator j;
1111         j = m_peers.getIterator();
1112         for(; j.atEnd() == false; j++)
1113         {
1114                 Peer *peer = j.getNode()->getValue();
1115                 Send(peer->id, channelnum, data, reliable);
1116         }
1117 }
1118
1119 void Connection::Send(u16 peer_id, u8 channelnum,
1120                 SharedBuffer<u8> data, bool reliable)
1121 {
1122         assert(channelnum < CHANNEL_COUNT);
1123         
1124         Peer *peer = GetPeer(peer_id);
1125         Channel *channel = &(peer->channels[channelnum]);
1126
1127         u32 chunksize_max = m_max_packet_size - BASE_HEADER_SIZE;
1128         if(reliable)
1129                 chunksize_max -= RELIABLE_HEADER_SIZE;
1130
1131         core::list<SharedBuffer<u8> > originals;
1132         originals = makeAutoSplitPacket(data, chunksize_max,
1133                         channel->next_outgoing_split_seqnum);
1134         
1135         core::list<SharedBuffer<u8> >::Iterator i;
1136         i = originals.begin();
1137         for(; i != originals.end(); i++)
1138         {
1139                 SharedBuffer<u8> original = *i;
1140                 
1141                 SendAsPacket(peer_id, channelnum, original, reliable);
1142         }
1143 }
1144
1145 void Connection::SendAsPacket(u16 peer_id, u8 channelnum,
1146                 SharedBuffer<u8> data, bool reliable)
1147 {
1148         Peer *peer = GetPeer(peer_id);
1149         Channel *channel = &(peer->channels[channelnum]);
1150
1151         if(reliable)
1152         {
1153                 u16 seqnum = channel->next_outgoing_seqnum;
1154                 channel->next_outgoing_seqnum++;
1155
1156                 SharedBuffer<u8> reliable = makeReliablePacket(data, seqnum);
1157
1158                 // Add base headers and make a packet
1159                 BufferedPacket p = makePacket(peer->address, reliable,
1160                                 m_protocol_id, m_peer_id, channelnum);
1161                 
1162                 try{
1163                         // Buffer the packet
1164                         channel->outgoing_reliables.insert(p);
1165                 }
1166                 catch(AlreadyExistsException &e)
1167                 {
1168                         PrintInfo(derr_con);
1169                         derr_con<<"WARNING: Going to send a reliable packet "
1170                                         "seqnum="<<seqnum<<" that is already "
1171                                         "in outgoing buffer"<<std::endl;
1172                         //assert(0);
1173                 }
1174                 
1175                 // Send the packet
1176                 RawSend(p);
1177         }
1178         else
1179         {
1180                 // Add base headers and make a packet
1181                 BufferedPacket p = makePacket(peer->address, data,
1182                                 m_protocol_id, m_peer_id, channelnum);
1183
1184                 // Send the packet
1185                 RawSend(p);
1186         }
1187 }
1188
1189 void Connection::RawSend(const BufferedPacket &packet)
1190 {
1191         m_socket.Send(packet.address, *packet.data, packet.data.getSize());
1192 }
1193
1194 void Connection::RunTimeouts(float dtime)
1195 {
1196         core::list<u16> timeouted_peers;
1197         core::map<u16, Peer*>::Iterator j;
1198         j = m_peers.getIterator();
1199         for(; j.atEnd() == false; j++)
1200         {
1201                 Peer *peer = j.getNode()->getValue();
1202                 
1203                 /*
1204                         Check peer timeout
1205                 */
1206                 peer->timeout_counter += dtime;
1207                 if(peer->timeout_counter > m_timeout)
1208                 {
1209                         PrintInfo(derr_con);
1210                         derr_con<<"RunTimeouts(): Peer "<<peer->id
1211                                         <<" has timed out."
1212                                         <<" (source=peer->timeout_counter)"
1213                                         <<std::endl;
1214                         // Add peer to the list
1215                         timeouted_peers.push_back(peer->id);
1216                         // Don't bother going through the buffers of this one
1217                         continue;
1218                 }
1219
1220                 float resend_timeout = peer->resend_timeout;
1221                 for(u16 i=0; i<CHANNEL_COUNT; i++)
1222                 {
1223                         core::list<BufferedPacket> timed_outs;
1224                         core::list<BufferedPacket>::Iterator j;
1225                         
1226                         Channel *channel = &peer->channels[i];
1227
1228                         // Remove timed out incomplete unreliable split packets
1229                         channel->incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout);
1230                         
1231                         // Increment reliable packet times
1232                         channel->outgoing_reliables.incrementTimeouts(dtime);
1233
1234                         // Check reliable packet total times, remove peer if
1235                         // over timeout.
1236                         if(channel->outgoing_reliables.anyTotaltimeReached(m_timeout))
1237                         {
1238                                 PrintInfo(derr_con);
1239                                 derr_con<<"RunTimeouts(): Peer "<<peer->id
1240                                                 <<" has timed out."
1241                                                 <<" (source=reliable packet totaltime)"
1242                                                 <<std::endl;
1243                                 // Add peer to the to-be-removed list
1244                                 timeouted_peers.push_back(peer->id);
1245                                 goto nextpeer;
1246                         }
1247
1248                         // Re-send timed out outgoing reliables
1249                         
1250                         timed_outs = channel->
1251                                         outgoing_reliables.getTimedOuts(resend_timeout);
1252
1253                         channel->outgoing_reliables.resetTimedOuts(resend_timeout);
1254
1255                         j = timed_outs.begin();
1256                         for(; j != timed_outs.end(); j++)
1257                         {
1258                                 u16 peer_id = readPeerId(*(j->data));
1259                                 u8 channel = readChannel(*(j->data));
1260                                 u16 seqnum = readU16(&(j->data[BASE_HEADER_SIZE+1]));
1261
1262                                 PrintInfo(derr_con);
1263                                 derr_con<<"RE-SENDING timed-out RELIABLE to ";
1264                                 j->address.print(&derr_con);
1265                                 derr_con<<"(t/o="<<resend_timeout<<"): "
1266                                                 <<"from_peer_id="<<peer_id
1267                                                 <<", channel="<<((int)channel&0xff)
1268                                                 <<", seqnum="<<seqnum
1269                                                 <<std::endl;
1270
1271                                 RawSend(*j);
1272
1273                                 // Enlarge avg_rtt and resend_timeout:
1274                                 // The rtt will be at least the timeout.
1275                                 // NOTE: This won't affect the timeout of the next
1276                                 // checked channel because it was cached.
1277                                 peer->reportRTT(resend_timeout);
1278                         }
1279                 }
1280                 
1281                 /*
1282                         Send pings
1283                 */
1284                 peer->ping_timer += dtime;
1285                 if(peer->ping_timer >= 5.0)
1286                 {
1287                         // Create and send PING packet
1288                         SharedBuffer<u8> data(2);
1289                         writeU8(&data[0], TYPE_CONTROL);
1290                         writeU8(&data[1], CONTROLTYPE_PING);
1291                         SendAsPacket(peer->id, 0, data, true);
1292
1293                         peer->ping_timer = 0.0;
1294                 }
1295                 
1296 nextpeer:
1297                 continue;
1298         }
1299
1300         // Remove timed out peers
1301         core::list<u16>::Iterator i = timeouted_peers.begin();
1302         for(; i != timeouted_peers.end(); i++)
1303         {
1304                 PrintInfo(derr_con);
1305                 derr_con<<"RunTimeouts(): Removing peer "<<(*i)<<std::endl;
1306                 deletePeer(*i, true);
1307         }
1308 }
1309
1310 Peer* Connection::GetPeer(u16 peer_id)
1311 {
1312         core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
1313
1314         if(node == NULL){
1315                 // Peer not found
1316                 throw PeerNotFoundException("GetPeer: Peer not found (possible timeout)");
1317         }
1318
1319         // Error checking
1320         assert(node->getValue()->id == peer_id);
1321
1322         return node->getValue();
1323 }
1324
1325 Peer* Connection::GetPeerNoEx(u16 peer_id)
1326 {
1327         core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
1328
1329         if(node == NULL){
1330                 return NULL;
1331         }
1332
1333         // Error checking
1334         assert(node->getValue()->id == peer_id);
1335
1336         return node->getValue();
1337 }
1338
1339 core::list<Peer*> Connection::GetPeers()
1340 {
1341         core::list<Peer*> list;
1342         core::map<u16, Peer*>::Iterator j;
1343         j = m_peers.getIterator();
1344         for(; j.atEnd() == false; j++)
1345         {
1346                 Peer *peer = j.getNode()->getValue();
1347                 list.push_back(peer);
1348         }
1349         return list;
1350 }
1351
1352 bool Connection::deletePeer(u16 peer_id, bool timeout)
1353 {
1354         if(m_peers.find(peer_id) == NULL)
1355                 return false;
1356         m_peerhandler->deletingPeer(m_peers[peer_id], timeout);
1357         delete m_peers[peer_id];
1358         m_peers.remove(peer_id);
1359         return true;
1360 }
1361
1362 void Connection::PrintInfo(std::ostream &out)
1363 {
1364         out<<m_socket.GetHandle();
1365         out<<" ";
1366         out<<"con "<<m_peer_id<<": ";
1367         for(s16 i=0; i<(s16)m_indentation-1; i++)
1368                 out<<"  ";
1369 }
1370
1371 void Connection::PrintInfo()
1372 {
1373         PrintInfo(dout_con);
1374 }
1375
1376 } // namespace
1377