3 Copyright (C) 2010 celeron55, Perttu Ahola <celeron55@gmail.com>
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License along
16 with this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 #include "connection.h"
22 #include "serialization.h"
27 BufferedPacket makePacket(Address &address, u8 *data, u32 datasize,
28 u32 protocol_id, u16 sender_peer_id, u8 channel)
30 u32 packet_size = datasize + BASE_HEADER_SIZE;
31 BufferedPacket p(packet_size);
34 writeU32(&p.data[0], protocol_id);
35 writeU16(&p.data[4], sender_peer_id);
36 writeU8(&p.data[6], channel);
38 memcpy(&p.data[BASE_HEADER_SIZE], data, datasize);
43 BufferedPacket makePacket(Address &address, SharedBuffer<u8> &data,
44 u32 protocol_id, u16 sender_peer_id, u8 channel)
46 return makePacket(address, *data, data.getSize(),
47 protocol_id, sender_peer_id, channel);
50 SharedBuffer<u8> makeOriginalPacket(
51 SharedBuffer<u8> data)
54 u32 packet_size = data.getSize() + header_size;
55 SharedBuffer<u8> b(packet_size);
57 writeU8(&b[0], TYPE_ORIGINAL);
59 memcpy(&b[header_size], *data, data.getSize());
64 core::list<SharedBuffer<u8> > makeSplitPacket(
65 SharedBuffer<u8> data,
69 // Chunk packets, containing the TYPE_SPLIT header
70 core::list<SharedBuffer<u8> > chunks;
72 u32 chunk_header_size = 7;
73 u32 maximum_data_size = chunksize_max - chunk_header_size;
78 end = start + maximum_data_size - 1;
79 if(end > data.getSize() - 1)
80 end = data.getSize() - 1;
82 u32 payload_size = end - start + 1;
83 u32 packet_size = chunk_header_size + payload_size;
85 SharedBuffer<u8> chunk(packet_size);
87 writeU8(&chunk[0], TYPE_SPLIT);
88 writeU16(&chunk[1], seqnum);
89 // [3] u16 chunk_count is written at next stage
90 writeU16(&chunk[5], chunk_num);
91 memcpy(&chunk[chunk_header_size], &data[start], payload_size);
93 chunks.push_back(chunk);
98 while(end != data.getSize() - 1);
100 u16 chunk_count = chunks.getSize();
102 core::list<SharedBuffer<u8> >::Iterator i = chunks.begin();
103 for(; i != chunks.end(); i++)
106 writeU16(&((*i)[3]), chunk_count);
112 core::list<SharedBuffer<u8> > makeAutoSplitPacket(
113 SharedBuffer<u8> data,
117 u32 original_header_size = 1;
118 core::list<SharedBuffer<u8> > list;
119 if(data.getSize() + original_header_size > chunksize_max)
121 list = makeSplitPacket(data, chunksize_max, split_seqnum);
127 list.push_back(makeOriginalPacket(data));
132 SharedBuffer<u8> makeReliablePacket(
133 SharedBuffer<u8> data,
136 /*dstream<<"BEGIN SharedBuffer<u8> makeReliablePacket()"<<std::endl;
137 dstream<<"data.getSize()="<<data.getSize()<<", data[0]="
138 <<((unsigned int)data[0]&0xff)<<std::endl;*/
140 u32 packet_size = data.getSize() + header_size;
141 SharedBuffer<u8> b(packet_size);
143 writeU8(&b[0], TYPE_RELIABLE);
144 writeU16(&b[1], seqnum);
146 memcpy(&b[header_size], *data, data.getSize());
148 /*dstream<<"data.getSize()="<<data.getSize()<<", data[0]="
149 <<((unsigned int)data[0]&0xff)<<std::endl;*/
150 //dstream<<"END SharedBuffer<u8> makeReliablePacket()"<<std::endl;
158 void ReliablePacketBuffer::print()
160 core::list<BufferedPacket>::Iterator i;
162 for(; i != m_list.end(); i++)
164 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
168 bool ReliablePacketBuffer::empty()
170 return m_list.empty();
172 u32 ReliablePacketBuffer::size()
174 return m_list.getSize();
176 RPBSearchResult ReliablePacketBuffer::findPacket(u16 seqnum)
178 core::list<BufferedPacket>::Iterator i;
180 for(; i != m_list.end(); i++)
182 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
183 /*dout_con<<"findPacket(): finding seqnum="<<seqnum
184 <<", comparing to s="<<s<<std::endl;*/
190 RPBSearchResult ReliablePacketBuffer::notFound()
194 u16 ReliablePacketBuffer::getFirstSeqnum()
197 throw NotFoundException("Buffer is empty");
198 BufferedPacket p = *m_list.begin();
199 return readU16(&p.data[BASE_HEADER_SIZE+1]);
201 BufferedPacket ReliablePacketBuffer::popFirst()
204 throw NotFoundException("Buffer is empty");
205 BufferedPacket p = *m_list.begin();
206 core::list<BufferedPacket>::Iterator i = m_list.begin();
210 BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum)
212 RPBSearchResult r = findPacket(seqnum);
214 dout_con<<"Not found"<<std::endl;
215 throw NotFoundException("seqnum not found in buffer");
217 BufferedPacket p = *r;
221 void ReliablePacketBuffer::insert(BufferedPacket &p)
223 assert(p.data.getSize() >= BASE_HEADER_SIZE+3);
224 u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
225 assert(type == TYPE_RELIABLE);
226 u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
228 // Find the right place for the packet and insert it there
230 // If list is empty, just add it
237 // Otherwise find the right place
238 core::list<BufferedPacket>::Iterator i;
240 // Find the first packet in the list which has a higher seqnum
241 for(; i != m_list.end(); i++){
242 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
244 throw AlreadyExistsException("Same seqnum in list");
246 if(seqnum_higher(s, seqnum)){
250 // If we're at the end of the list, add the packet to the
252 if(i == m_list.end())
259 m_list.insert_before(i, p);
262 void ReliablePacketBuffer::incrementTimeouts(float dtime)
264 core::list<BufferedPacket>::Iterator i;
266 for(; i != m_list.end(); i++){
268 i->totaltime += dtime;
272 void ReliablePacketBuffer::resetTimedOuts(float timeout)
274 core::list<BufferedPacket>::Iterator i;
276 for(; i != m_list.end(); i++){
277 if(i->time >= timeout)
282 bool ReliablePacketBuffer::anyTotaltimeReached(float timeout)
284 core::list<BufferedPacket>::Iterator i;
286 for(; i != m_list.end(); i++){
287 if(i->totaltime >= timeout)
293 core::list<BufferedPacket> ReliablePacketBuffer::getTimedOuts(float timeout)
295 core::list<BufferedPacket> timed_outs;
296 core::list<BufferedPacket>::Iterator i;
298 for(; i != m_list.end(); i++)
300 if(i->time >= timeout)
301 timed_outs.push_back(*i);
310 IncomingSplitBuffer::~IncomingSplitBuffer()
312 core::map<u16, IncomingSplitPacket*>::Iterator i;
313 i = m_buf.getIterator();
314 for(; i.atEnd() == false; i++)
316 delete i.getNode()->getValue();
320 This will throw a GotSplitPacketException when a full
321 split packet is constructed.
323 SharedBuffer<u8> IncomingSplitBuffer::insert(BufferedPacket &p, bool reliable)
325 u32 headersize = BASE_HEADER_SIZE + 7;
326 assert(p.data.getSize() >= headersize);
327 u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
328 assert(type == TYPE_SPLIT);
329 u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
330 u16 chunk_count = readU16(&p.data[BASE_HEADER_SIZE+3]);
331 u16 chunk_num = readU16(&p.data[BASE_HEADER_SIZE+5]);
333 // Add if doesn't exist
334 if(m_buf.find(seqnum) == NULL)
336 IncomingSplitPacket *sp = new IncomingSplitPacket();
337 sp->chunk_count = chunk_count;
338 sp->reliable = reliable;
342 IncomingSplitPacket *sp = m_buf[seqnum];
344 // TODO: These errors should be thrown or something? Dunno.
345 if(chunk_count != sp->chunk_count)
346 derr_con<<"Connection: WARNING: chunk_count="<<chunk_count
347 <<" != sp->chunk_count="<<sp->chunk_count
349 if(reliable != sp->reliable)
350 derr_con<<"Connection: WARNING: reliable="<<reliable
351 <<" != sp->reliable="<<sp->reliable
354 // If chunk already exists, cancel
355 if(sp->chunks.find(chunk_num) != NULL)
356 throw AlreadyExistsException("Chunk already in buffer");
358 // Cut chunk data out of packet
359 u32 chunkdatasize = p.data.getSize() - headersize;
360 SharedBuffer<u8> chunkdata(chunkdatasize);
361 memcpy(*chunkdata, &(p.data[headersize]), chunkdatasize);
363 // Set chunk data in buffer
364 sp->chunks[chunk_num] = chunkdata;
366 // If not all chunks are received, return empty buffer
367 if(sp->allReceived() == false)
368 return SharedBuffer<u8>();
370 // Calculate total size
372 core::map<u16, SharedBuffer<u8> >::Iterator i;
373 i = sp->chunks.getIterator();
374 for(; i.atEnd() == false; i++)
376 totalsize += i.getNode()->getValue().getSize();
379 SharedBuffer<u8> fulldata(totalsize);
381 // Copy chunks to data buffer
383 for(u32 chunk_i=0; chunk_i<sp->chunk_count;
386 SharedBuffer<u8> buf = sp->chunks[chunk_i];
387 u16 chunkdatasize = buf.getSize();
388 memcpy(&fulldata[start], *buf, chunkdatasize);
389 start += chunkdatasize;;
392 // Remove sp from buffer
393 m_buf.remove(seqnum);
398 void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout)
400 core::list<u16> remove_queue;
401 core::map<u16, IncomingSplitPacket*>::Iterator i;
402 i = m_buf.getIterator();
403 for(; i.atEnd() == false; i++)
405 IncomingSplitPacket *p = i.getNode()->getValue();
406 // Reliable ones are not removed by timeout
407 if(p->reliable == true)
410 if(p->time >= timeout)
411 remove_queue.push_back(i.getNode()->getKey());
413 core::list<u16>::Iterator j;
414 j = remove_queue.begin();
415 for(; j != remove_queue.end(); j++)
417 dout_con<<"NOTE: Removing timed out unreliable split packet"
430 next_outgoing_seqnum = SEQNUM_INITIAL;
431 next_incoming_seqnum = SEQNUM_INITIAL;
432 next_outgoing_split_seqnum = SEQNUM_INITIAL;
442 Peer::Peer(u16 a_id, Address a_address)
446 timeout_counter = 0.0;
447 //resend_timeout = RESEND_TIMEOUT_MINIMUM;
449 resend_timeout = 0.5;
451 has_sent_with_id = false;
457 void Peer::reportRTT(float rtt)
461 else if(avg_rtt < 0.0)
464 avg_rtt = rtt * 0.1 + avg_rtt * 0.9;
466 // Calculate resend_timeout
468 /*int reliable_count = 0;
469 for(int i=0; i<CHANNEL_COUNT; i++)
471 reliable_count += channels[i].outgoing_reliables.size();
473 float timeout = avg_rtt * RESEND_TIMEOUT_FACTOR
474 * ((float)reliable_count * 1);*/
476 float timeout = avg_rtt * RESEND_TIMEOUT_FACTOR;
477 if(timeout < RESEND_TIMEOUT_MIN)
478 timeout = RESEND_TIMEOUT_MIN;
479 if(timeout > RESEND_TIMEOUT_MAX)
480 timeout = RESEND_TIMEOUT_MAX;
481 resend_timeout = timeout;
488 Connection::Connection(
492 PeerHandler *peerhandler
495 assert(peerhandler != NULL);
497 m_protocol_id = protocol_id;
498 m_max_packet_size = max_packet_size;
500 m_peer_id = PEER_ID_INEXISTENT;
501 //m_waiting_new_peer_id = false;
503 m_peerhandler = peerhandler;
506 Connection::~Connection()
509 core::map<u16, Peer*>::Iterator j;
510 j = m_peers.getIterator();
511 for(; j.atEnd() == false; j++)
513 Peer *peer = j.getNode()->getValue();
518 void Connection::Serve(unsigned short port)
521 m_peer_id = PEER_ID_SERVER;
524 void Connection::Connect(Address address)
526 core::map<u16, Peer*>::Node *node = m_peers.find(PEER_ID_SERVER);
528 throw ConnectionException("Already connected to a server");
531 Peer *peer = new Peer(PEER_ID_SERVER, address);
532 m_peers.insert(peer->id, peer);
533 m_peerhandler->peerAdded(peer);
537 // Send a dummy packet to server with peer_id = PEER_ID_INEXISTENT
538 m_peer_id = PEER_ID_INEXISTENT;
539 SharedBuffer<u8> data(0);
540 Send(PEER_ID_SERVER, 0, data, true);
542 //m_waiting_new_peer_id = true;
545 void Connection::Disconnect()
547 // Create and send DISCO packet
548 SharedBuffer<u8> data(2);
549 writeU8(&data[0], TYPE_CONTROL);
550 writeU8(&data[1], CONTROLTYPE_DISCO);
553 core::map<u16, Peer*>::Iterator j;
554 j = m_peers.getIterator();
555 for(; j.atEnd() == false; j++)
557 Peer *peer = j.getNode()->getValue();
558 SendAsPacket(peer->id, 0, data, false);
562 bool Connection::Connected()
564 if(m_peers.size() != 1)
567 core::map<u16, Peer*>::Node *node = m_peers.find(PEER_ID_SERVER);
571 if(m_peer_id == PEER_ID_INEXISTENT)
577 SharedBuffer<u8> Channel::ProcessPacket(
578 SharedBuffer<u8> packetdata,
584 IndentationRaiser iraiser(&(con->m_indentation));
586 if(packetdata.getSize() < 1)
587 throw InvalidIncomingDataException("packetdata.getSize() < 1");
589 u8 type = readU8(&packetdata[0]);
591 if(type == TYPE_CONTROL)
593 if(packetdata.getSize() < 2)
594 throw InvalidIncomingDataException("packetdata.getSize() < 2");
596 u8 controltype = readU8(&packetdata[1]);
598 if(controltype == CONTROLTYPE_ACK)
600 if(packetdata.getSize() < 4)
601 throw InvalidIncomingDataException
602 ("packetdata.getSize() < 4 (ACK header size)");
604 u16 seqnum = readU16(&packetdata[2]);
606 dout_con<<"Got CONTROLTYPE_ACK: channelnum="
607 <<((int)channelnum&0xff)<<", peer_id="<<peer_id
608 <<", seqnum="<<seqnum<<std::endl;
611 BufferedPacket p = outgoing_reliables.popSeqnum(seqnum);
612 // Get round trip time
613 float rtt = p.totaltime;
615 // Let peer calculate stuff according to it
616 // (avg_rtt and resend_timeout)
617 Peer *peer = con->GetPeer(peer_id);
618 peer->reportRTT(rtt);
620 //con->PrintInfo(dout_con);
621 //dout_con<<"RTT = "<<rtt<<std::endl;
623 /*dout_con<<"OUTGOING: ";
625 outgoing_reliables.print();
626 dout_con<<std::endl;*/
628 catch(NotFoundException &e){
629 con->PrintInfo(derr_con);
630 derr_con<<"WARNING: ACKed packet not "
635 throw ProcessedSilentlyException("Got an ACK");
637 else if(controltype == CONTROLTYPE_SET_PEER_ID)
639 if(packetdata.getSize() < 4)
640 throw InvalidIncomingDataException
641 ("packetdata.getSize() < 4 (SET_PEER_ID header size)");
642 u16 peer_id_new = readU16(&packetdata[2]);
644 dout_con<<"Got new peer id: "<<peer_id_new<<"... "<<std::endl;
646 if(con->GetPeerID() != PEER_ID_INEXISTENT)
648 con->PrintInfo(derr_con);
649 derr_con<<"WARNING: Not changing"
650 " existing peer id."<<std::endl;
654 dout_con<<"changing."<<std::endl;
655 con->SetPeerID(peer_id_new);
657 throw ProcessedSilentlyException("Got a SET_PEER_ID");
659 else if(controltype == CONTROLTYPE_PING)
661 // Just ignore it, the incoming data already reset
662 // the timeout counter
664 dout_con<<"PING"<<std::endl;
665 throw ProcessedSilentlyException("Got a PING");
667 else if(controltype == CONTROLTYPE_DISCO)
669 // Just ignore it, the incoming data already reset
670 // the timeout counter
672 dout_con<<"DISCO: Removing peer "<<(peer_id)<<std::endl;
674 if(con->deletePeer(peer_id, false) == false)
676 con->PrintInfo(derr_con);
677 derr_con<<"DISCO: Peer not found"<<std::endl;
680 throw ProcessedSilentlyException("Got a DISCO");
683 con->PrintInfo(derr_con);
684 derr_con<<"INVALID TYPE_CONTROL: invalid controltype="
685 <<((int)controltype&0xff)<<std::endl;
686 throw InvalidIncomingDataException("Invalid control type");
689 else if(type == TYPE_ORIGINAL)
691 if(packetdata.getSize() < ORIGINAL_HEADER_SIZE)
692 throw InvalidIncomingDataException
693 ("packetdata.getSize() < ORIGINAL_HEADER_SIZE");
695 dout_con<<"RETURNING TYPE_ORIGINAL to user"
697 // Get the inside packet out and return it
698 SharedBuffer<u8> payload(packetdata.getSize() - ORIGINAL_HEADER_SIZE);
699 memcpy(*payload, &packetdata[ORIGINAL_HEADER_SIZE], payload.getSize());
702 else if(type == TYPE_SPLIT)
704 // We have to create a packet again for buffering
705 // This isn't actually too bad an idea.
706 BufferedPacket packet = makePacket(
707 con->GetPeer(peer_id)->address,
709 con->GetProtocolID(),
713 SharedBuffer<u8> data = incoming_splits.insert(packet, reliable);
714 if(data.getSize() != 0)
717 dout_con<<"RETURNING TYPE_SPLIT: Constructed full data, "
718 <<"size="<<data.getSize()<<std::endl;
722 dout_con<<"BUFFERED TYPE_SPLIT"<<std::endl;
723 throw ProcessedSilentlyException("Buffered a split packet chunk");
725 else if(type == TYPE_RELIABLE)
727 // Recursive reliable packets not allowed
728 assert(reliable == false);
730 if(packetdata.getSize() < RELIABLE_HEADER_SIZE)
731 throw InvalidIncomingDataException
732 ("packetdata.getSize() < RELIABLE_HEADER_SIZE");
734 u16 seqnum = readU16(&packetdata[1]);
736 bool is_future_packet = seqnum_higher(seqnum, next_incoming_seqnum);
737 bool is_old_packet = seqnum_higher(next_incoming_seqnum, seqnum);
741 dout_con<<"BUFFERING";
742 else if(is_old_packet)
746 dout_con<<" TYPE_RELIABLE seqnum="<<seqnum
747 <<" next="<<next_incoming_seqnum;
748 dout_con<<" [sending CONTROLTYPE_ACK"
749 " to peer_id="<<peer_id<<"]";
753 //assert(incoming_reliables.size() < 100);
755 // Send a CONTROLTYPE_ACK
756 SharedBuffer<u8> reply(4);
757 writeU8(&reply[0], TYPE_CONTROL);
758 writeU8(&reply[1], CONTROLTYPE_ACK);
759 writeU16(&reply[2], seqnum);
760 con->SendAsPacket(peer_id, channelnum, reply, false);
762 //if(seqnum_higher(seqnum, next_incoming_seqnum))
766 dout_con<<"Buffering reliable packet (seqnum="
767 <<seqnum<<")"<<std::endl;*/
769 // This one comes later, buffer it.
770 // Actually we have to make a packet to buffer one.
771 // Well, we have all the ingredients, so just do it.
772 BufferedPacket packet = makePacket(
773 con->GetPeer(peer_id)->address,
775 con->GetProtocolID(),
779 incoming_reliables.insert(packet);
782 dout_con<<"INCOMING: ";
783 incoming_reliables.print();
784 dout_con<<std::endl;*/
786 catch(AlreadyExistsException &e)
790 throw ProcessedSilentlyException("Buffered future reliable packet");
792 //else if(seqnum_higher(next_incoming_seqnum, seqnum))
793 else if(is_old_packet)
795 // An old packet, dump it
796 throw InvalidIncomingDataException("Got an old reliable packet");
799 next_incoming_seqnum++;
801 // Get out the inside packet and re-process it
802 SharedBuffer<u8> payload(packetdata.getSize() - RELIABLE_HEADER_SIZE);
803 memcpy(*payload, &packetdata[RELIABLE_HEADER_SIZE], payload.getSize());
805 return ProcessPacket(payload, con, peer_id, channelnum, true);
809 con->PrintInfo(derr_con);
810 derr_con<<"Got invalid type="<<((int)type&0xff)<<std::endl;
811 throw InvalidIncomingDataException("Invalid packet type");
814 // We should never get here.
815 // If you get here, add an exception or a return to some of the
816 // above conditionals.
818 throw BaseException("Error in Channel::ProcessPacket()");
821 SharedBuffer<u8> Channel::CheckIncomingBuffers(Connection *con,
825 // Clear old packets from start of buffer
828 firstseqnum = incoming_reliables.getFirstSeqnum();
829 if(seqnum_higher(next_incoming_seqnum, firstseqnum))
830 incoming_reliables.popFirst();
834 // This happens if all packets are old
835 }catch(con::NotFoundException)
838 if(incoming_reliables.empty() == false)
840 if(firstseqnum == next_incoming_seqnum)
842 BufferedPacket p = incoming_reliables.popFirst();
844 peer_id = readPeerId(*p.data);
845 u8 channelnum = readChannel(*p.data);
846 u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
849 dout_con<<"UNBUFFERING TYPE_RELIABLE"
851 <<" peer_id="<<peer_id
852 <<" channel="<<((int)channelnum&0xff)
855 next_incoming_seqnum++;
857 u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
858 // Get out the inside packet and re-process it
859 SharedBuffer<u8> payload(p.data.getSize() - headers_size);
860 memcpy(*payload, &p.data[headers_size], payload.getSize());
862 return ProcessPacket(payload, con, peer_id, channelnum, true);
866 throw NoIncomingDataException("No relevant data in buffers");
869 SharedBuffer<u8> Connection::GetFromBuffers(u16 &peer_id)
871 core::map<u16, Peer*>::Iterator j;
872 j = m_peers.getIterator();
873 for(; j.atEnd() == false; j++)
875 Peer *peer = j.getNode()->getValue();
876 for(u16 i=0; i<CHANNEL_COUNT; i++)
878 Channel *channel = &peer->channels[i];
880 SharedBuffer<u8> resultdata = channel->CheckIncomingBuffers
885 catch(NoIncomingDataException &e)
888 catch(InvalidIncomingDataException &e)
891 catch(ProcessedSilentlyException &e)
896 throw NoIncomingDataException("No relevant data in buffers");
899 u32 Connection::Receive(u16 &peer_id, u8 *data, u32 datasize)
902 Receive a packet from the network
905 // TODO: We can not know how many layers of header there are.
906 // For now, just assume there are no other than the base headers.
907 u32 packet_maxsize = datasize + BASE_HEADER_SIZE;
908 Buffer<u8> packetdata(packet_maxsize);
915 Check if some buffer has relevant data
918 SharedBuffer<u8> resultdata = GetFromBuffers(peer_id);
920 if(datasize < resultdata.getSize())
921 throw InvalidIncomingDataException
922 ("Buffer too small for received data");
924 memcpy(data, *resultdata, resultdata.getSize());
925 return resultdata.getSize();
927 catch(NoIncomingDataException &e)
933 s32 received_size = m_socket.Receive(sender, *packetdata, packet_maxsize);
935 if(received_size < 0)
936 throw NoIncomingDataException("No incoming data");
937 if(received_size < BASE_HEADER_SIZE)
938 throw InvalidIncomingDataException("No full header received");
939 if(readU32(&packetdata[0]) != m_protocol_id)
940 throw InvalidIncomingDataException("Invalid protocol id");
942 peer_id = readPeerId(*packetdata);
943 u8 channelnum = readChannel(*packetdata);
944 if(channelnum > CHANNEL_COUNT-1){
946 derr_con<<"Receive(): Invalid channel "<<channelnum<<std::endl;
947 throw InvalidIncomingDataException("Channel doesn't exist");
950 if(peer_id == PEER_ID_INEXISTENT)
953 Somebody is trying to send stuff to us with no peer id.
955 Check if the same address and port was added to our peer
957 Allow only entries that have has_sent_with_id==false.
960 core::map<u16, Peer*>::Iterator j;
961 j = m_peers.getIterator();
962 for(; j.atEnd() == false; j++)
964 Peer *peer = j.getNode()->getValue();
965 if(peer->has_sent_with_id)
967 if(peer->address == sender)
972 If no peer was found with the same address and port,
973 we shall assume it is a new peer and create an entry.
977 // Pass on to adding the peer
979 // Else: A peer was found.
982 Peer *peer = j.getNode()->getValue();
985 derr_con<<"WARNING: Assuming unknown peer to be "
986 <<"peer_id="<<peer_id<<std::endl;
991 The peer was not found in our lists. Add it.
993 if(peer_id == PEER_ID_INEXISTENT)
995 // Somebody wants to make a new connection
997 // Get a unique peer id (2 or higher)
1000 Find an unused peer id
1005 if(m_peers.find(peer_id_new) == NULL)
1007 // Check for overflow
1008 if(peer_id_new == 65535)
1009 throw ConnectionException
1010 ("Connection ran out of peer ids");
1015 dout_con<<"Receive(): Got a packet with peer_id=PEER_ID_INEXISTENT,"
1016 " giving peer_id="<<peer_id_new<<std::endl;
1019 Peer *peer = new Peer(peer_id_new, sender);
1020 m_peers.insert(peer->id, peer);
1021 m_peerhandler->peerAdded(peer);
1023 // Create CONTROL packet to tell the peer id to the new peer.
1024 SharedBuffer<u8> reply(4);
1025 writeU8(&reply[0], TYPE_CONTROL);
1026 writeU8(&reply[1], CONTROLTYPE_SET_PEER_ID);
1027 writeU16(&reply[2], peer_id_new);
1028 SendAsPacket(peer_id_new, 0, reply, true);
1030 // We're now talking to a valid peer_id
1031 peer_id = peer_id_new;
1033 // Go on and process whatever it sent
1036 core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
1041 // This means that the peer id of the sender is not PEER_ID_INEXISTENT
1042 // and it is invalid.
1043 PrintInfo(derr_con);
1044 derr_con<<"Receive(): Peer not found"<<std::endl;
1045 throw InvalidIncomingDataException("Peer not found (possible timeout)");
1048 Peer *peer = node->getValue();
1050 // Validate peer address
1051 if(peer->address != sender)
1053 PrintInfo(derr_con);
1054 derr_con<<"Peer "<<peer_id<<" sending from different address."
1055 " Ignoring."<<std::endl;
1056 throw InvalidIncomingDataException
1057 ("Peer sending from different address");
1058 /*// If there is more data, receive again
1059 if(m_socket.WaitData(0) == true)
1061 throw NoIncomingDataException("No incoming data (2)");*/
1064 peer->timeout_counter = 0.0;
1066 Channel *channel = &(peer->channels[channelnum]);
1068 // Throw the received packet to channel->processPacket()
1070 // Make a new SharedBuffer from the data without the base headers
1071 SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE);
1072 memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE],
1073 strippeddata.getSize());
1076 // Process it (the result is some data with no headers made by us)
1077 SharedBuffer<u8> resultdata = channel->ProcessPacket
1078 (strippeddata, this, peer_id, channelnum);
1081 dout_con<<"ProcessPacket returned data of size "
1082 <<resultdata.getSize()<<std::endl;
1084 if(datasize < resultdata.getSize())
1085 throw InvalidIncomingDataException
1086 ("Buffer too small for received data");
1088 memcpy(data, *resultdata, resultdata.getSize());
1089 return resultdata.getSize();
1091 catch(ProcessedSilentlyException &e)
1093 // If there is more data, receive again
1094 if(m_socket.WaitData(0) == true)
1097 throw NoIncomingDataException("No incoming data (2)");
1099 catch(InvalidIncomingDataException &e)
1101 // If there is more data, receive again
1102 if(m_socket.WaitData(0) == true)
1105 catch(SendFailedException &e)
1107 derr_con<<"Receive(): SendFailedException; peer_id="
1108 <<peer_id<<std::endl;
1113 void Connection::SendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable)
1115 core::map<u16, Peer*>::Iterator j;
1116 j = m_peers.getIterator();
1117 for(; j.atEnd() == false; j++)
1119 Peer *peer = j.getNode()->getValue();
1120 Send(peer->id, channelnum, data, reliable);
1124 void Connection::Send(u16 peer_id, u8 channelnum,
1125 SharedBuffer<u8> data, bool reliable)
1127 assert(channelnum < CHANNEL_COUNT);
1129 Peer *peer = GetPeerNoEx(peer_id);
1132 Channel *channel = &(peer->channels[channelnum]);
1134 u32 chunksize_max = m_max_packet_size - BASE_HEADER_SIZE;
1136 chunksize_max -= RELIABLE_HEADER_SIZE;
1138 core::list<SharedBuffer<u8> > originals;
1139 originals = makeAutoSplitPacket(data, chunksize_max,
1140 channel->next_outgoing_split_seqnum);
1142 core::list<SharedBuffer<u8> >::Iterator i;
1143 i = originals.begin();
1144 for(; i != originals.end(); i++)
1146 SharedBuffer<u8> original = *i;
1148 SendAsPacket(peer_id, channelnum, original, reliable);
1152 void Connection::SendAsPacket(u16 peer_id, u8 channelnum,
1153 SharedBuffer<u8> data, bool reliable)
1155 Peer *peer = GetPeer(peer_id);
1156 Channel *channel = &(peer->channels[channelnum]);
1160 u16 seqnum = channel->next_outgoing_seqnum;
1161 channel->next_outgoing_seqnum++;
1163 SharedBuffer<u8> reliable = makeReliablePacket(data, seqnum);
1165 // Add base headers and make a packet
1166 BufferedPacket p = makePacket(peer->address, reliable,
1167 m_protocol_id, m_peer_id, channelnum);
1170 // Buffer the packet
1171 channel->outgoing_reliables.insert(p);
1173 catch(AlreadyExistsException &e)
1175 PrintInfo(derr_con);
1176 derr_con<<"WARNING: Going to send a reliable packet "
1177 "seqnum="<<seqnum<<" that is already "
1178 "in outgoing buffer"<<std::endl;
1187 // Add base headers and make a packet
1188 BufferedPacket p = makePacket(peer->address, data,
1189 m_protocol_id, m_peer_id, channelnum);
1196 void Connection::RawSend(const BufferedPacket &packet)
1198 m_socket.Send(packet.address, *packet.data, packet.data.getSize());
1201 void Connection::RunTimeouts(float dtime)
1203 core::list<u16> timeouted_peers;
1204 core::map<u16, Peer*>::Iterator j;
1205 j = m_peers.getIterator();
1206 for(; j.atEnd() == false; j++)
1208 Peer *peer = j.getNode()->getValue();
1213 peer->timeout_counter += dtime;
1214 if(peer->timeout_counter > m_timeout)
1216 PrintInfo(derr_con);
1217 derr_con<<"RunTimeouts(): Peer "<<peer->id
1219 <<" (source=peer->timeout_counter)"
1221 // Add peer to the list
1222 timeouted_peers.push_back(peer->id);
1223 // Don't bother going through the buffers of this one
1227 float resend_timeout = peer->resend_timeout;
1228 for(u16 i=0; i<CHANNEL_COUNT; i++)
1230 core::list<BufferedPacket> timed_outs;
1231 core::list<BufferedPacket>::Iterator j;
1233 Channel *channel = &peer->channels[i];
1235 // Remove timed out incomplete unreliable split packets
1236 channel->incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout);
1238 // Increment reliable packet times
1239 channel->outgoing_reliables.incrementTimeouts(dtime);
1241 // Check reliable packet total times, remove peer if
1243 if(channel->outgoing_reliables.anyTotaltimeReached(m_timeout))
1245 PrintInfo(derr_con);
1246 derr_con<<"RunTimeouts(): Peer "<<peer->id
1248 <<" (source=reliable packet totaltime)"
1250 // Add peer to the to-be-removed list
1251 timeouted_peers.push_back(peer->id);
1255 // Re-send timed out outgoing reliables
1257 timed_outs = channel->
1258 outgoing_reliables.getTimedOuts(resend_timeout);
1260 channel->outgoing_reliables.resetTimedOuts(resend_timeout);
1262 j = timed_outs.begin();
1263 for(; j != timed_outs.end(); j++)
1265 u16 peer_id = readPeerId(*(j->data));
1266 u8 channel = readChannel(*(j->data));
1267 u16 seqnum = readU16(&(j->data[BASE_HEADER_SIZE+1]));
1269 PrintInfo(derr_con);
1270 derr_con<<"RE-SENDING timed-out RELIABLE to ";
1271 j->address.print(&derr_con);
1272 derr_con<<"(t/o="<<resend_timeout<<"): "
1273 <<"from_peer_id="<<peer_id
1274 <<", channel="<<((int)channel&0xff)
1275 <<", seqnum="<<seqnum
1280 // Enlarge avg_rtt and resend_timeout:
1281 // The rtt will be at least the timeout.
1282 // NOTE: This won't affect the timeout of the next
1283 // checked channel because it was cached.
1284 peer->reportRTT(resend_timeout);
1291 peer->ping_timer += dtime;
1292 if(peer->ping_timer >= 5.0)
1294 // Create and send PING packet
1295 SharedBuffer<u8> data(2);
1296 writeU8(&data[0], TYPE_CONTROL);
1297 writeU8(&data[1], CONTROLTYPE_PING);
1298 SendAsPacket(peer->id, 0, data, true);
1300 peer->ping_timer = 0.0;
1307 // Remove timed out peers
1308 core::list<u16>::Iterator i = timeouted_peers.begin();
1309 for(; i != timeouted_peers.end(); i++)
1311 PrintInfo(derr_con);
1312 derr_con<<"RunTimeouts(): Removing peer "<<(*i)<<std::endl;
1313 deletePeer(*i, true);
1317 Peer* Connection::GetPeer(u16 peer_id)
1319 core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
1323 throw PeerNotFoundException("GetPeer: Peer not found (possible timeout)");
1327 assert(node->getValue()->id == peer_id);
1329 return node->getValue();
1332 Peer* Connection::GetPeerNoEx(u16 peer_id)
1334 core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
1341 assert(node->getValue()->id == peer_id);
1343 return node->getValue();
1346 core::list<Peer*> Connection::GetPeers()
1348 core::list<Peer*> list;
1349 core::map<u16, Peer*>::Iterator j;
1350 j = m_peers.getIterator();
1351 for(; j.atEnd() == false; j++)
1353 Peer *peer = j.getNode()->getValue();
1354 list.push_back(peer);
1359 bool Connection::deletePeer(u16 peer_id, bool timeout)
1361 if(m_peers.find(peer_id) == NULL)
1363 m_peerhandler->deletingPeer(m_peers[peer_id], timeout);
1364 delete m_peers[peer_id];
1365 m_peers.remove(peer_id);
1369 void Connection::PrintInfo(std::ostream &out)
1371 out<<m_socket.GetHandle();
1373 out<<"con "<<m_peer_id<<": ";
1374 for(s16 i=0; i<(s16)m_indentation-1; i++)
1378 void Connection::PrintInfo()
1380 PrintInfo(dout_con);