3 Copyright (C) 2010 celeron55, Perttu Ahola <celeron55@gmail.com>
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License along
16 with this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 #include "connection.h"
22 #include "serialization.h"
27 BufferedPacket makePacket(Address &address, u8 *data, u32 datasize,
28 u32 protocol_id, u16 sender_peer_id, u8 channel)
30 u32 packet_size = datasize + BASE_HEADER_SIZE;
31 BufferedPacket p(packet_size);
34 writeU32(&p.data[0], protocol_id);
35 writeU16(&p.data[4], sender_peer_id);
36 writeU8(&p.data[6], channel);
38 memcpy(&p.data[BASE_HEADER_SIZE], data, datasize);
43 BufferedPacket makePacket(Address &address, SharedBuffer<u8> &data,
44 u32 protocol_id, u16 sender_peer_id, u8 channel)
46 return makePacket(address, *data, data.getSize(),
47 protocol_id, sender_peer_id, channel);
50 SharedBuffer<u8> makeOriginalPacket(
51 SharedBuffer<u8> data)
54 u32 packet_size = data.getSize() + header_size;
55 SharedBuffer<u8> b(packet_size);
57 writeU8(&b[0], TYPE_ORIGINAL);
59 memcpy(&b[header_size], *data, data.getSize());
64 core::list<SharedBuffer<u8> > makeSplitPacket(
65 SharedBuffer<u8> data,
69 // Chunk packets, containing the TYPE_SPLIT header
70 core::list<SharedBuffer<u8> > chunks;
72 u32 chunk_header_size = 7;
73 u32 maximum_data_size = chunksize_max - chunk_header_size;
78 end = start + maximum_data_size - 1;
79 if(end > data.getSize() - 1)
80 end = data.getSize() - 1;
82 u32 payload_size = end - start + 1;
83 u32 packet_size = chunk_header_size + payload_size;
85 SharedBuffer<u8> chunk(packet_size);
87 writeU8(&chunk[0], TYPE_SPLIT);
88 writeU16(&chunk[1], seqnum);
89 // [3] u16 chunk_count is written at next stage
90 writeU16(&chunk[5], chunk_num);
91 memcpy(&chunk[chunk_header_size], &data[start], payload_size);
93 chunks.push_back(chunk);
98 while(end != data.getSize() - 1);
100 u16 chunk_count = chunks.getSize();
102 core::list<SharedBuffer<u8> >::Iterator i = chunks.begin();
103 for(; i != chunks.end(); i++)
106 writeU16(&((*i)[3]), chunk_count);
112 core::list<SharedBuffer<u8> > makeAutoSplitPacket(
113 SharedBuffer<u8> data,
117 u32 original_header_size = 1;
118 core::list<SharedBuffer<u8> > list;
119 if(data.getSize() + original_header_size > chunksize_max)
121 list = makeSplitPacket(data, chunksize_max, split_seqnum);
127 list.push_back(makeOriginalPacket(data));
132 SharedBuffer<u8> makeReliablePacket(
133 SharedBuffer<u8> data,
136 /*dstream<<"BEGIN SharedBuffer<u8> makeReliablePacket()"<<std::endl;
137 dstream<<"data.getSize()="<<data.getSize()<<", data[0]="
138 <<((unsigned int)data[0]&0xff)<<std::endl;*/
140 u32 packet_size = data.getSize() + header_size;
141 SharedBuffer<u8> b(packet_size);
143 writeU8(&b[0], TYPE_RELIABLE);
144 writeU16(&b[1], seqnum);
146 memcpy(&b[header_size], *data, data.getSize());
148 /*dstream<<"data.getSize()="<<data.getSize()<<", data[0]="
149 <<((unsigned int)data[0]&0xff)<<std::endl;*/
150 //dstream<<"END SharedBuffer<u8> makeReliablePacket()"<<std::endl;
158 void ReliablePacketBuffer::print()
160 core::list<BufferedPacket>::Iterator i;
162 for(; i != m_list.end(); i++)
164 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
168 bool ReliablePacketBuffer::empty()
170 return m_list.empty();
172 u32 ReliablePacketBuffer::size()
174 return m_list.getSize();
176 RPBSearchResult ReliablePacketBuffer::findPacket(u16 seqnum)
178 core::list<BufferedPacket>::Iterator i;
180 for(; i != m_list.end(); i++)
182 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
183 /*dout_con<<"findPacket(): finding seqnum="<<seqnum
184 <<", comparing to s="<<s<<std::endl;*/
190 RPBSearchResult ReliablePacketBuffer::notFound()
194 u16 ReliablePacketBuffer::getFirstSeqnum()
197 throw NotFoundException("Buffer is empty");
198 BufferedPacket p = *m_list.begin();
199 return readU16(&p.data[BASE_HEADER_SIZE+1]);
201 BufferedPacket ReliablePacketBuffer::popFirst()
204 throw NotFoundException("Buffer is empty");
205 BufferedPacket p = *m_list.begin();
206 core::list<BufferedPacket>::Iterator i = m_list.begin();
210 BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum)
212 RPBSearchResult r = findPacket(seqnum);
214 dout_con<<"Not found"<<std::endl;
215 throw NotFoundException("seqnum not found in buffer");
217 BufferedPacket p = *r;
221 void ReliablePacketBuffer::insert(BufferedPacket &p)
223 assert(p.data.getSize() >= BASE_HEADER_SIZE+3);
224 u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
225 assert(type == TYPE_RELIABLE);
226 u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
228 // Find the right place for the packet and insert it there
230 // If list is empty, just add it
237 // Otherwise find the right place
238 core::list<BufferedPacket>::Iterator i;
240 // Find the first packet in the list which has a higher seqnum
241 for(; i != m_list.end(); i++){
242 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
244 throw AlreadyExistsException("Same seqnum in list");
246 if(seqnum_higher(s, seqnum)){
250 // If we're at the end of the list, add the packet to the
252 if(i == m_list.end())
259 m_list.insert_before(i, p);
262 void ReliablePacketBuffer::incrementTimeouts(float dtime)
264 core::list<BufferedPacket>::Iterator i;
266 for(; i != m_list.end(); i++){
268 i->totaltime += dtime;
272 void ReliablePacketBuffer::resetTimedOuts(float timeout)
274 core::list<BufferedPacket>::Iterator i;
276 for(; i != m_list.end(); i++){
277 if(i->time >= timeout)
282 bool ReliablePacketBuffer::anyTotaltimeReached(float timeout)
284 core::list<BufferedPacket>::Iterator i;
286 for(; i != m_list.end(); i++){
287 if(i->totaltime >= timeout)
293 core::list<BufferedPacket> ReliablePacketBuffer::getTimedOuts(float timeout)
295 core::list<BufferedPacket> timed_outs;
296 core::list<BufferedPacket>::Iterator i;
298 for(; i != m_list.end(); i++)
300 if(i->time >= timeout)
301 timed_outs.push_back(*i);
310 IncomingSplitBuffer::~IncomingSplitBuffer()
312 core::map<u16, IncomingSplitPacket*>::Iterator i;
313 i = m_buf.getIterator();
314 for(; i.atEnd() == false; i++)
316 delete i.getNode()->getValue();
320 This will throw a GotSplitPacketException when a full
321 split packet is constructed.
323 void IncomingSplitBuffer::insert(BufferedPacket &p, bool reliable)
325 u32 headersize = BASE_HEADER_SIZE + 7;
326 assert(p.data.getSize() >= headersize);
327 u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
328 assert(type == TYPE_SPLIT);
329 u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
330 u16 chunk_count = readU16(&p.data[BASE_HEADER_SIZE+3]);
331 u16 chunk_num = readU16(&p.data[BASE_HEADER_SIZE+5]);
333 // Add if doesn't exist
334 if(m_buf.find(seqnum) == NULL)
336 IncomingSplitPacket *sp = new IncomingSplitPacket();
337 sp->chunk_count = chunk_count;
338 sp->reliable = reliable;
342 IncomingSplitPacket *sp = m_buf[seqnum];
344 // TODO: These errors should be thrown or something? Dunno.
345 if(chunk_count != sp->chunk_count)
346 derr_con<<"Connection: WARNING: chunk_count="<<chunk_count
347 <<" != sp->chunk_count="<<sp->chunk_count
349 if(reliable != sp->reliable)
350 derr_con<<"Connection: WARNING: reliable="<<reliable
351 <<" != sp->reliable="<<sp->reliable
354 // If chunk already exists, cancel
355 if(sp->chunks.find(chunk_num) != NULL)
356 throw AlreadyExistsException("Chunk already in buffer");
358 // Cut chunk data out of packet
359 u32 chunkdatasize = p.data.getSize() - headersize;
360 SharedBuffer<u8> chunkdata(chunkdatasize);
361 memcpy(*chunkdata, &(p.data[headersize]), chunkdatasize);
363 // Set chunk data in buffer
364 sp->chunks[chunk_num] = chunkdata;
366 // If not all chunks are received, return
367 if(sp->allReceived() == false)
370 // Calculate total size
372 core::map<u16, SharedBuffer<u8> >::Iterator i;
373 i = sp->chunks.getIterator();
374 for(; i.atEnd() == false; i++)
376 totalsize += i.getNode()->getValue().getSize();
379 SharedBuffer<u8> fulldata(totalsize);
381 // Copy chunks to data buffer
383 for(u32 chunk_i=0; chunk_i<sp->chunk_count;
386 SharedBuffer<u8> buf = sp->chunks[chunk_i];
387 u16 chunkdatasize = buf.getSize();
388 memcpy(&fulldata[start], *buf, chunkdatasize);
389 start += chunkdatasize;;
392 // Remove sp from buffer
393 m_buf.remove(seqnum);
396 throw GotSplitPacketException(fulldata);
398 void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout)
400 core::list<u16> remove_queue;
401 core::map<u16, IncomingSplitPacket*>::Iterator i;
402 i = m_buf.getIterator();
403 for(; i.atEnd() == false; i++)
405 IncomingSplitPacket *p = i.getNode()->getValue();
406 // Reliable ones are not removed by timeout
407 if(p->reliable == true)
410 if(p->time >= timeout)
411 remove_queue.push_back(i.getNode()->getKey());
413 core::list<u16>::Iterator j;
414 j = remove_queue.begin();
415 for(; j != remove_queue.end(); j++)
417 dout_con<<"NOTE: Removing timed out unreliable split packet"
430 next_outgoing_seqnum = SEQNUM_INITIAL;
431 next_incoming_seqnum = SEQNUM_INITIAL;
432 next_outgoing_split_seqnum = SEQNUM_INITIAL;
442 Peer::Peer(u16 a_id, Address a_address)
446 timeout_counter = 0.0;
447 //resend_timeout = RESEND_TIMEOUT_MINIMUM;
449 resend_timeout = 0.5;
451 has_sent_with_id = false;
457 void Peer::reportRTT(float rtt)
461 else if(avg_rtt < 0.0)
464 avg_rtt = rtt * 0.1 + avg_rtt * 0.9;
466 // Calculate resend_timeout
468 /*int reliable_count = 0;
469 for(int i=0; i<CHANNEL_COUNT; i++)
471 reliable_count += channels[i].outgoing_reliables.size();
473 float timeout = avg_rtt * RESEND_TIMEOUT_FACTOR
474 * ((float)reliable_count * 1);*/
476 float timeout = avg_rtt * RESEND_TIMEOUT_FACTOR;
477 if(timeout < RESEND_TIMEOUT_MIN)
478 timeout = RESEND_TIMEOUT_MIN;
479 if(timeout > RESEND_TIMEOUT_MAX)
480 timeout = RESEND_TIMEOUT_MAX;
481 resend_timeout = timeout;
488 Connection::Connection(
492 PeerHandler *peerhandler
495 assert(peerhandler != NULL);
497 m_protocol_id = protocol_id;
498 m_max_packet_size = max_packet_size;
500 m_peer_id = PEER_ID_NEW;
501 //m_waiting_new_peer_id = false;
503 m_peerhandler = peerhandler;
506 Connection::~Connection()
509 core::map<u16, Peer*>::Iterator j;
510 j = m_peers.getIterator();
511 for(; j.atEnd() == false; j++)
513 Peer *peer = j.getNode()->getValue();
518 void Connection::Serve(unsigned short port)
521 m_peer_id = PEER_ID_SERVER;
524 void Connection::Connect(Address address)
526 core::map<u16, Peer*>::Node *node = m_peers.find(PEER_ID_SERVER);
528 throw ConnectionException("Already connected to a server");
531 Peer *peer = new Peer(PEER_ID_SERVER, address);
532 m_peers.insert(peer->id, peer);
533 m_peerhandler->peerAdded(peer);
537 // Send a dummy packet to server with peer_id = PEER_ID_NEW
538 m_peer_id = PEER_ID_NEW;
539 SharedBuffer<u8> data(0);
540 Send(PEER_ID_SERVER, 0, data, true);
542 //m_waiting_new_peer_id = true;
545 void Connection::Disconnect()
547 // Create and send DISCO packet
548 SharedBuffer<u8> data(2);
549 writeU8(&data[0], TYPE_CONTROL);
550 writeU8(&data[1], CONTROLTYPE_DISCO);
553 core::map<u16, Peer*>::Iterator j;
554 j = m_peers.getIterator();
555 for(; j.atEnd() == false; j++)
557 Peer *peer = j.getNode()->getValue();
558 SendAsPacket(peer->id, 0, data, false);
562 bool Connection::Connected()
564 if(m_peers.size() != 1)
567 core::map<u16, Peer*>::Node *node = m_peers.find(PEER_ID_SERVER);
571 if(m_peer_id == PEER_ID_NEW)
577 SharedBuffer<u8> Channel::ProcessPacket(
578 SharedBuffer<u8> packetdata,
584 IndentationRaiser iraiser(&(con->m_indentation));
586 if(packetdata.getSize() < 1)
587 throw InvalidIncomingDataException("packetdata.getSize() < 1");
589 u8 type = readU8(&packetdata[0]);
591 if(type == TYPE_CONTROL)
593 if(packetdata.getSize() < 2)
594 throw InvalidIncomingDataException("packetdata.getSize() < 2");
596 u8 controltype = readU8(&packetdata[1]);
598 if(controltype == CONTROLTYPE_ACK)
600 if(packetdata.getSize() < 4)
601 throw InvalidIncomingDataException
602 ("packetdata.getSize() < 4 (ACK header size)");
604 u16 seqnum = readU16(&packetdata[2]);
606 dout_con<<"Got CONTROLTYPE_ACK: channelnum="
607 <<((int)channelnum&0xff)<<", peer_id="<<peer_id
608 <<", seqnum="<<seqnum<<std::endl;
611 BufferedPacket p = outgoing_reliables.popSeqnum(seqnum);
612 // Get round trip time
613 float rtt = p.totaltime;
615 // Let peer calculate stuff according to it
616 // (avg_rtt and resend_timeout)
617 Peer *peer = con->GetPeer(peer_id);
618 peer->reportRTT(rtt);
620 //con->PrintInfo(dout_con);
621 //dout_con<<"RTT = "<<rtt<<std::endl;
623 /*dout_con<<"OUTGOING: ";
625 outgoing_reliables.print();
626 dout_con<<std::endl;*/
628 catch(NotFoundException &e){
629 con->PrintInfo(derr_con);
630 derr_con<<"WARNING: ACKed packet not "
635 throw ProcessedSilentlyException("Got an ACK");
637 else if(controltype == CONTROLTYPE_SET_PEER_ID)
639 if(packetdata.getSize() < 4)
640 throw InvalidIncomingDataException
641 ("packetdata.getSize() < 4 (SET_PEER_ID header size)");
642 u16 peer_id_new = readU16(&packetdata[2]);
644 dout_con<<"Got new peer id: "<<peer_id_new<<"... "<<std::endl;
646 if(con->GetPeerID() != PEER_ID_NEW)
648 con->PrintInfo(derr_con);
649 derr_con<<"WARNING: Not changing"
650 " existing peer id."<<std::endl;
654 dout_con<<"changing."<<std::endl;
655 con->SetPeerID(peer_id_new);
657 throw ProcessedSilentlyException("Got a SET_PEER_ID");
659 else if(controltype == CONTROLTYPE_PING)
661 // Just ignore it, the incoming data already reset
662 // the timeout counter
664 dout_con<<"PING"<<std::endl;
665 throw ProcessedSilentlyException("Got a PING");
667 else if(controltype == CONTROLTYPE_DISCO)
669 // Just ignore it, the incoming data already reset
670 // the timeout counter
672 dout_con<<"DISCO: Removing peer "<<(peer_id)<<std::endl;
674 if(con->deletePeer(peer_id, false) == false)
676 con->PrintInfo(derr_con);
677 derr_con<<"DISCO: Peer not found"<<std::endl;
680 throw ProcessedSilentlyException("Got a DISCO");
683 con->PrintInfo(derr_con);
684 derr_con<<"INVALID TYPE_CONTROL: invalid controltype="
685 <<((int)controltype&0xff)<<std::endl;
686 throw InvalidIncomingDataException("Invalid control type");
689 else if(type == TYPE_ORIGINAL)
691 if(packetdata.getSize() < ORIGINAL_HEADER_SIZE)
692 throw InvalidIncomingDataException
693 ("packetdata.getSize() < ORIGINAL_HEADER_SIZE");
695 dout_con<<"RETURNING TYPE_ORIGINAL to user"
697 // Get the inside packet out and return it
698 SharedBuffer<u8> payload(packetdata.getSize() - ORIGINAL_HEADER_SIZE);
699 memcpy(*payload, &packetdata[ORIGINAL_HEADER_SIZE], payload.getSize());
702 else if(type == TYPE_SPLIT)
704 // We have to create a packet again for buffering
705 // This isn't actually too bad an idea.
706 BufferedPacket packet = makePacket(
707 con->GetPeer(peer_id)->address,
709 con->GetProtocolID(),
714 incoming_splits.insert(packet, reliable);
716 // This exception happens when all the pieces of a packet
718 catch(GotSplitPacketException &e)
721 dout_con<<"RETURNING TYPE_SPLIT: Constructed full data, "
722 <<"size="<<e.getData().getSize()<<std::endl;
726 dout_con<<"BUFFERING TYPE_SPLIT"<<std::endl;
727 throw ProcessedSilentlyException("Buffered a split packet chunk");
729 else if(type == TYPE_RELIABLE)
731 // Recursive reliable packets not allowed
732 assert(reliable == false);
734 if(packetdata.getSize() < RELIABLE_HEADER_SIZE)
735 throw InvalidIncomingDataException
736 ("packetdata.getSize() < RELIABLE_HEADER_SIZE");
738 u16 seqnum = readU16(&packetdata[1]);
740 bool is_future_packet = seqnum_higher(seqnum, next_incoming_seqnum);
741 bool is_old_packet = seqnum_higher(next_incoming_seqnum, seqnum);
745 dout_con<<"BUFFERING";
746 else if(is_old_packet)
750 dout_con<<" TYPE_RELIABLE seqnum="<<seqnum
751 <<" next="<<next_incoming_seqnum;
752 dout_con<<" [sending CONTROLTYPE_ACK"
753 " to peer_id="<<peer_id<<"]";
757 //assert(incoming_reliables.size() < 100);
759 // Send a CONTROLTYPE_ACK
760 SharedBuffer<u8> reply(4);
761 writeU8(&reply[0], TYPE_CONTROL);
762 writeU8(&reply[1], CONTROLTYPE_ACK);
763 writeU16(&reply[2], seqnum);
764 con->SendAsPacket(peer_id, channelnum, reply, false);
766 //if(seqnum_higher(seqnum, next_incoming_seqnum))
770 dout_con<<"Buffering reliable packet (seqnum="
771 <<seqnum<<")"<<std::endl;*/
773 // This one comes later, buffer it.
774 // Actually we have to make a packet to buffer one.
775 // Well, we have all the ingredients, so just do it.
776 BufferedPacket packet = makePacket(
777 con->GetPeer(peer_id)->address,
779 con->GetProtocolID(),
783 incoming_reliables.insert(packet);
786 dout_con<<"INCOMING: ";
787 incoming_reliables.print();
788 dout_con<<std::endl;*/
790 catch(AlreadyExistsException &e)
794 throw ProcessedSilentlyException("Buffered future reliable packet");
796 //else if(seqnum_higher(next_incoming_seqnum, seqnum))
797 else if(is_old_packet)
799 // An old packet, dump it
800 throw InvalidIncomingDataException("Got an old reliable packet");
803 next_incoming_seqnum++;
805 // Get out the inside packet and re-process it
806 SharedBuffer<u8> payload(packetdata.getSize() - RELIABLE_HEADER_SIZE);
807 memcpy(*payload, &packetdata[RELIABLE_HEADER_SIZE], payload.getSize());
809 return ProcessPacket(payload, con, peer_id, channelnum, true);
813 con->PrintInfo(derr_con);
814 derr_con<<"Got invalid type="<<((int)type&0xff)<<std::endl;
815 throw InvalidIncomingDataException("Invalid packet type");
818 // We should never get here.
819 // If you get here, add an exception or a return to some of the
820 // above conditionals.
822 throw BaseException("Error in Channel::ProcessPacket()");
825 SharedBuffer<u8> Channel::CheckIncomingBuffers(Connection *con,
829 // Clear old packets from start of buffer
832 firstseqnum = incoming_reliables.getFirstSeqnum();
833 if(seqnum_higher(next_incoming_seqnum, firstseqnum))
834 incoming_reliables.popFirst();
838 // This happens if all packets are old
839 }catch(con::NotFoundException)
842 if(incoming_reliables.empty() == false)
844 if(firstseqnum == next_incoming_seqnum)
846 BufferedPacket p = incoming_reliables.popFirst();
848 peer_id = readPeerId(*p.data);
849 u8 channelnum = readChannel(*p.data);
850 u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
853 dout_con<<"UNBUFFERING TYPE_RELIABLE"
855 <<" peer_id="<<peer_id
856 <<" channel="<<((int)channelnum&0xff)
859 next_incoming_seqnum++;
861 u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
862 // Get out the inside packet and re-process it
863 SharedBuffer<u8> payload(p.data.getSize() - headers_size);
864 memcpy(*payload, &p.data[headers_size], payload.getSize());
866 return ProcessPacket(payload, con, peer_id, channelnum, true);
870 throw NoIncomingDataException("No relevant data in buffers");
873 SharedBuffer<u8> Connection::GetFromBuffers(u16 &peer_id)
875 core::map<u16, Peer*>::Iterator j;
876 j = m_peers.getIterator();
877 for(; j.atEnd() == false; j++)
879 Peer *peer = j.getNode()->getValue();
880 for(u16 i=0; i<CHANNEL_COUNT; i++)
882 Channel *channel = &peer->channels[i];
884 SharedBuffer<u8> resultdata = channel->CheckIncomingBuffers
889 catch(NoIncomingDataException &e)
892 catch(InvalidIncomingDataException &e)
895 catch(ProcessedSilentlyException &e)
900 throw NoIncomingDataException("No relevant data in buffers");
903 u32 Connection::Receive(u16 &peer_id, u8 *data, u32 datasize)
906 Receive a packet from the network
909 // TODO: We can not know how many layers of header there are.
910 // For now, just assume there are no other than the base headers.
911 u32 packet_maxsize = datasize + BASE_HEADER_SIZE;
912 Buffer<u8> packetdata(packet_maxsize);
919 Check if some buffer has relevant data
922 SharedBuffer<u8> resultdata = GetFromBuffers(peer_id);
924 if(datasize < resultdata.getSize())
925 throw InvalidIncomingDataException
926 ("Buffer too small for received data");
928 memcpy(data, *resultdata, resultdata.getSize());
929 return resultdata.getSize();
931 catch(NoIncomingDataException &e)
937 s32 received_size = m_socket.Receive(sender, *packetdata, packet_maxsize);
939 if(received_size < 0)
940 throw NoIncomingDataException("No incoming data");
941 if(received_size < BASE_HEADER_SIZE)
942 throw InvalidIncomingDataException("No full header received");
943 if(readU32(&packetdata[0]) != m_protocol_id)
944 throw InvalidIncomingDataException("Invalid protocol id");
946 peer_id = readPeerId(*packetdata);
947 u8 channelnum = readChannel(*packetdata);
948 if(channelnum > CHANNEL_COUNT-1){
950 derr_con<<"Receive(): Invalid channel "<<channelnum<<std::endl;
951 throw InvalidIncomingDataException("Channel doesn't exist");
954 if(peer_id == PEER_ID_NEW)
957 Somebody is trying to send stuff to us with no peer id.
959 Check if the same address and port was added to our peer
961 Allow only entries that have has_sent_with_id==false.
964 core::map<u16, Peer*>::Iterator j;
965 j = m_peers.getIterator();
966 for(; j.atEnd() == false; j++)
968 Peer *peer = j.getNode()->getValue();
969 if(peer->has_sent_with_id)
971 if(peer->address == sender)
976 If no peer was found with the same address and port,
977 we shall assume it is a new peer and create an entry.
981 // Pass on to adding the peer
983 // Else: A peer was found.
986 Peer *peer = j.getNode()->getValue();
989 derr_con<<"WARNING: Assuming unknown peer to be "
990 <<"peer_id="<<peer_id<<std::endl;
995 The peer was not found in our lists. Add it.
997 if(peer_id == PEER_ID_NEW)
999 // Somebody wants to make a new connection
1001 // Get a unique peer id (2 or higher)
1002 u16 peer_id_new = 2;
1004 Find an unused peer id
1009 if(m_peers.find(peer_id_new) == NULL)
1011 // Check for overflow
1012 if(peer_id_new == 65535)
1013 throw ConnectionException
1014 ("Connection ran out of peer ids");
1019 dout_con<<"Receive(): Got a packet with peer_id=PEER_ID_NEW,"
1020 " giving peer_id="<<peer_id_new<<std::endl;
1023 Peer *peer = new Peer(peer_id_new, sender);
1024 m_peers.insert(peer->id, peer);
1025 m_peerhandler->peerAdded(peer);
1027 // Create CONTROL packet to tell the peer id to the new peer.
1028 SharedBuffer<u8> reply(4);
1029 writeU8(&reply[0], TYPE_CONTROL);
1030 writeU8(&reply[1], CONTROLTYPE_SET_PEER_ID);
1031 writeU16(&reply[2], peer_id_new);
1032 SendAsPacket(peer_id_new, 0, reply, true);
1034 // We're now talking to a valid peer_id
1035 peer_id = peer_id_new;
1037 // Go on and process whatever it sent
1040 core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
1045 // This means that the peer id of the sender is not PEER_ID_NEW
1046 // and it is invalid.
1047 PrintInfo(derr_con);
1048 derr_con<<"Receive(): Peer not found"<<std::endl;
1049 throw InvalidIncomingDataException("Peer not found (possible timeout)");
1052 Peer *peer = node->getValue();
1054 // Validate peer address
1055 if(peer->address != sender)
1057 PrintInfo(derr_con);
1058 derr_con<<"Peer "<<peer_id<<" sending from different address."
1059 " Ignoring."<<std::endl;
1060 throw InvalidIncomingDataException
1061 ("Peer sending from different address");
1062 /*// If there is more data, receive again
1063 if(m_socket.WaitData(0) == true)
1065 throw NoIncomingDataException("No incoming data (2)");*/
1068 peer->timeout_counter = 0.0;
1070 Channel *channel = &(peer->channels[channelnum]);
1072 // Throw the received packet to channel->processPacket()
1074 // Make a new SharedBuffer from the data without the base headers
1075 SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE);
1076 memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE],
1077 strippeddata.getSize());
1080 // Process it (the result is some data with no headers made by us)
1081 SharedBuffer<u8> resultdata = channel->ProcessPacket
1082 (strippeddata, this, peer_id, channelnum);
1085 dout_con<<"ProcessPacket returned data of size "
1086 <<resultdata.getSize()<<std::endl;
1088 if(datasize < resultdata.getSize())
1089 throw InvalidIncomingDataException
1090 ("Buffer too small for received data");
1092 memcpy(data, *resultdata, resultdata.getSize());
1093 return resultdata.getSize();
1095 catch(ProcessedSilentlyException &e)
1097 // If there is more data, receive again
1098 if(m_socket.WaitData(0) == true)
1101 throw NoIncomingDataException("No incoming data (2)");
1103 catch(InvalidIncomingDataException &e)
1105 // If there is more data, receive again
1106 if(m_socket.WaitData(0) == true)
1112 void Connection::SendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable)
1114 core::map<u16, Peer*>::Iterator j;
1115 j = m_peers.getIterator();
1116 for(; j.atEnd() == false; j++)
1118 Peer *peer = j.getNode()->getValue();
1119 Send(peer->id, channelnum, data, reliable);
1123 void Connection::Send(u16 peer_id, u8 channelnum,
1124 SharedBuffer<u8> data, bool reliable)
1126 assert(channelnum < CHANNEL_COUNT);
1128 Peer *peer = GetPeer(peer_id);
1129 Channel *channel = &(peer->channels[channelnum]);
1131 u32 chunksize_max = m_max_packet_size - BASE_HEADER_SIZE;
1133 chunksize_max -= RELIABLE_HEADER_SIZE;
1135 core::list<SharedBuffer<u8> > originals;
1136 originals = makeAutoSplitPacket(data, chunksize_max,
1137 channel->next_outgoing_split_seqnum);
1139 core::list<SharedBuffer<u8> >::Iterator i;
1140 i = originals.begin();
1141 for(; i != originals.end(); i++)
1143 SharedBuffer<u8> original = *i;
1145 SendAsPacket(peer_id, channelnum, original, reliable);
1149 void Connection::SendAsPacket(u16 peer_id, u8 channelnum,
1150 SharedBuffer<u8> data, bool reliable)
1152 Peer *peer = GetPeer(peer_id);
1153 Channel *channel = &(peer->channels[channelnum]);
1157 u16 seqnum = channel->next_outgoing_seqnum;
1158 channel->next_outgoing_seqnum++;
1160 SharedBuffer<u8> reliable = makeReliablePacket(data, seqnum);
1162 // Add base headers and make a packet
1163 BufferedPacket p = makePacket(peer->address, reliable,
1164 m_protocol_id, m_peer_id, channelnum);
1167 // Buffer the packet
1168 channel->outgoing_reliables.insert(p);
1170 catch(AlreadyExistsException &e)
1172 PrintInfo(derr_con);
1173 derr_con<<"WARNING: Going to send a reliable packet "
1174 "seqnum="<<seqnum<<" that is already "
1175 "in outgoing buffer"<<std::endl;
1184 // Add base headers and make a packet
1185 BufferedPacket p = makePacket(peer->address, data,
1186 m_protocol_id, m_peer_id, channelnum);
1193 void Connection::RawSend(const BufferedPacket &packet)
1195 m_socket.Send(packet.address, *packet.data, packet.data.getSize());
1198 void Connection::RunTimeouts(float dtime)
1200 core::list<u16> timeouted_peers;
1201 core::map<u16, Peer*>::Iterator j;
1202 j = m_peers.getIterator();
1203 for(; j.atEnd() == false; j++)
1205 Peer *peer = j.getNode()->getValue();
1210 peer->timeout_counter += dtime;
1211 if(peer->timeout_counter > m_timeout)
1213 PrintInfo(derr_con);
1214 derr_con<<"RunTimeouts(): Peer "<<peer->id
1216 <<" (source=peer->timeout_counter)"
1218 // Add peer to the list
1219 timeouted_peers.push_back(peer->id);
1220 // Don't bother going through the buffers of this one
1224 float resend_timeout = peer->resend_timeout;
1225 for(u16 i=0; i<CHANNEL_COUNT; i++)
1227 core::list<BufferedPacket> timed_outs;
1228 core::list<BufferedPacket>::Iterator j;
1230 Channel *channel = &peer->channels[i];
1232 // Remove timed out incomplete unreliable split packets
1233 channel->incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout);
1235 // Increment reliable packet times
1236 channel->outgoing_reliables.incrementTimeouts(dtime);
1238 // Check reliable packet total times, remove peer if
1240 if(channel->outgoing_reliables.anyTotaltimeReached(m_timeout))
1242 PrintInfo(derr_con);
1243 derr_con<<"RunTimeouts(): Peer "<<peer->id
1245 <<" (source=reliable packet totaltime)"
1247 // Add peer to the to-be-removed list
1248 timeouted_peers.push_back(peer->id);
1252 // Re-send timed out outgoing reliables
1254 timed_outs = channel->
1255 outgoing_reliables.getTimedOuts(resend_timeout);
1257 channel->outgoing_reliables.resetTimedOuts(resend_timeout);
1259 j = timed_outs.begin();
1260 for(; j != timed_outs.end(); j++)
1262 u16 peer_id = readPeerId(*(j->data));
1263 u8 channel = readChannel(*(j->data));
1264 u16 seqnum = readU16(&(j->data[BASE_HEADER_SIZE+1]));
1266 PrintInfo(derr_con);
1267 derr_con<<"RE-SENDING timed-out RELIABLE to ";
1268 j->address.print(&derr_con);
1269 derr_con<<"(t/o="<<resend_timeout<<"): "
1270 <<"from_peer_id="<<peer_id
1271 <<", channel="<<((int)channel&0xff)
1272 <<", seqnum="<<seqnum
1277 // Enlarge avg_rtt and resend_timeout:
1278 // The rtt will be at least the timeout.
1279 // NOTE: This won't affect the timeout of the next
1280 // checked channel because it was cached.
1281 peer->reportRTT(resend_timeout);
1288 peer->ping_timer += dtime;
1289 if(peer->ping_timer >= 5.0)
1291 // Create and send PING packet
1292 SharedBuffer<u8> data(2);
1293 writeU8(&data[0], TYPE_CONTROL);
1294 writeU8(&data[1], CONTROLTYPE_PING);
1295 SendAsPacket(peer->id, 0, data, true);
1297 peer->ping_timer = 0.0;
1304 // Remove timed out peers
1305 core::list<u16>::Iterator i = timeouted_peers.begin();
1306 for(; i != timeouted_peers.end(); i++)
1308 PrintInfo(derr_con);
1309 derr_con<<"RunTimeouts(): Removing peer "<<(*i)<<std::endl;
1310 deletePeer(*i, true);
1314 Peer* Connection::GetPeer(u16 peer_id)
1316 core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
1320 throw PeerNotFoundException("Peer not found (possible timeout)");
1324 assert(node->getValue()->id == peer_id);
1326 return node->getValue();
1329 Peer* Connection::GetPeerNoEx(u16 peer_id)
1331 core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
1338 assert(node->getValue()->id == peer_id);
1340 return node->getValue();
1343 core::list<Peer*> Connection::GetPeers()
1345 core::list<Peer*> list;
1346 core::map<u16, Peer*>::Iterator j;
1347 j = m_peers.getIterator();
1348 for(; j.atEnd() == false; j++)
1350 Peer *peer = j.getNode()->getValue();
1351 list.push_back(peer);
1356 bool Connection::deletePeer(u16 peer_id, bool timeout)
1358 if(m_peers.find(peer_id) == NULL)
1360 m_peerhandler->deletingPeer(m_peers[peer_id], timeout);
1361 delete m_peers[peer_id];
1362 m_peers.remove(peer_id);
1366 void Connection::PrintInfo(std::ostream &out)
1368 out<<m_socket.GetHandle();
1370 out<<"con "<<m_peer_id<<": ";
1371 for(s16 i=0; i<(s16)m_indentation-1; i++)
1375 void Connection::PrintInfo()
1377 PrintInfo(dout_con);