3 Copyright (C) 2010 celeron55, Perttu Ahola <celeron55@gmail.com>
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU Lesser General Public License as published by
7 the Free Software Foundation; either version 2.1 of the License, or
8 (at your option) any later version.
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU Lesser General Public License for more details.
15 You should have received a copy of the GNU Lesser General Public License along
16 with this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 #include "connection.h"
22 #include "serialization.h"
25 #include "util/serialize.h"
26 #include "util/numeric.h"
27 #include "util/string.h"
32 static u16 readPeerId(u8 *packetdata)
34 return readU16(&packetdata[4]);
36 static u8 readChannel(u8 *packetdata)
38 return readU8(&packetdata[6]);
41 BufferedPacket makePacket(Address &address, u8 *data, u32 datasize,
42 u32 protocol_id, u16 sender_peer_id, u8 channel)
44 u32 packet_size = datasize + BASE_HEADER_SIZE;
45 BufferedPacket p(packet_size);
48 writeU32(&p.data[0], protocol_id);
49 writeU16(&p.data[4], sender_peer_id);
50 writeU8(&p.data[6], channel);
52 memcpy(&p.data[BASE_HEADER_SIZE], data, datasize);
57 BufferedPacket makePacket(Address &address, SharedBuffer<u8> &data,
58 u32 protocol_id, u16 sender_peer_id, u8 channel)
60 return makePacket(address, *data, data.getSize(),
61 protocol_id, sender_peer_id, channel);
64 SharedBuffer<u8> makeOriginalPacket(
65 SharedBuffer<u8> data)
68 u32 packet_size = data.getSize() + header_size;
69 SharedBuffer<u8> b(packet_size);
71 writeU8(&b[0], TYPE_ORIGINAL);
73 memcpy(&b[header_size], *data, data.getSize());
78 core::list<SharedBuffer<u8> > makeSplitPacket(
79 SharedBuffer<u8> data,
83 // Chunk packets, containing the TYPE_SPLIT header
84 core::list<SharedBuffer<u8> > chunks;
86 u32 chunk_header_size = 7;
87 u32 maximum_data_size = chunksize_max - chunk_header_size;
92 end = start + maximum_data_size - 1;
93 if(end > data.getSize() - 1)
94 end = data.getSize() - 1;
96 u32 payload_size = end - start + 1;
97 u32 packet_size = chunk_header_size + payload_size;
99 SharedBuffer<u8> chunk(packet_size);
101 writeU8(&chunk[0], TYPE_SPLIT);
102 writeU16(&chunk[1], seqnum);
103 // [3] u16 chunk_count is written at next stage
104 writeU16(&chunk[5], chunk_num);
105 memcpy(&chunk[chunk_header_size], &data[start], payload_size);
107 chunks.push_back(chunk);
112 while(end != data.getSize() - 1);
114 u16 chunk_count = chunks.getSize();
116 core::list<SharedBuffer<u8> >::Iterator i = chunks.begin();
117 for(; i != chunks.end(); i++)
120 writeU16(&((*i)[3]), chunk_count);
126 core::list<SharedBuffer<u8> > makeAutoSplitPacket(
127 SharedBuffer<u8> data,
131 u32 original_header_size = 1;
132 core::list<SharedBuffer<u8> > list;
133 if(data.getSize() + original_header_size > chunksize_max)
135 list = makeSplitPacket(data, chunksize_max, split_seqnum);
141 list.push_back(makeOriginalPacket(data));
146 SharedBuffer<u8> makeReliablePacket(
147 SharedBuffer<u8> data,
150 /*dstream<<"BEGIN SharedBuffer<u8> makeReliablePacket()"<<std::endl;
151 dstream<<"data.getSize()="<<data.getSize()<<", data[0]="
152 <<((unsigned int)data[0]&0xff)<<std::endl;*/
154 u32 packet_size = data.getSize() + header_size;
155 SharedBuffer<u8> b(packet_size);
157 writeU8(&b[0], TYPE_RELIABLE);
158 writeU16(&b[1], seqnum);
160 memcpy(&b[header_size], *data, data.getSize());
162 /*dstream<<"data.getSize()="<<data.getSize()<<", data[0]="
163 <<((unsigned int)data[0]&0xff)<<std::endl;*/
164 //dstream<<"END SharedBuffer<u8> makeReliablePacket()"<<std::endl;
172 void ReliablePacketBuffer::print()
174 core::list<BufferedPacket>::Iterator i;
176 for(; i != m_list.end(); i++)
178 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
182 bool ReliablePacketBuffer::empty()
184 return m_list.empty();
186 u32 ReliablePacketBuffer::size()
188 return m_list.getSize();
190 RPBSearchResult ReliablePacketBuffer::findPacket(u16 seqnum)
192 core::list<BufferedPacket>::Iterator i;
194 for(; i != m_list.end(); i++)
196 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
197 /*dout_con<<"findPacket(): finding seqnum="<<seqnum
198 <<", comparing to s="<<s<<std::endl;*/
204 RPBSearchResult ReliablePacketBuffer::notFound()
208 u16 ReliablePacketBuffer::getFirstSeqnum()
211 throw NotFoundException("Buffer is empty");
212 BufferedPacket p = *m_list.begin();
213 return readU16(&p.data[BASE_HEADER_SIZE+1]);
215 BufferedPacket ReliablePacketBuffer::popFirst()
218 throw NotFoundException("Buffer is empty");
219 BufferedPacket p = *m_list.begin();
220 core::list<BufferedPacket>::Iterator i = m_list.begin();
224 BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum)
226 RPBSearchResult r = findPacket(seqnum);
228 dout_con<<"Not found"<<std::endl;
229 throw NotFoundException("seqnum not found in buffer");
231 BufferedPacket p = *r;
235 void ReliablePacketBuffer::insert(BufferedPacket &p)
237 assert(p.data.getSize() >= BASE_HEADER_SIZE+3);
238 u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
239 assert(type == TYPE_RELIABLE);
240 u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
242 // Find the right place for the packet and insert it there
244 // If list is empty, just add it
251 // Otherwise find the right place
252 core::list<BufferedPacket>::Iterator i;
254 // Find the first packet in the list which has a higher seqnum
255 for(; i != m_list.end(); i++){
256 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
258 throw AlreadyExistsException("Same seqnum in list");
260 if(seqnum_higher(s, seqnum)){
264 // If we're at the end of the list, add the packet to the
266 if(i == m_list.end())
273 m_list.insert_before(i, p);
276 void ReliablePacketBuffer::incrementTimeouts(float dtime)
278 core::list<BufferedPacket>::Iterator i;
280 for(; i != m_list.end(); i++){
282 i->totaltime += dtime;
286 void ReliablePacketBuffer::resetTimedOuts(float timeout)
288 core::list<BufferedPacket>::Iterator i;
290 for(; i != m_list.end(); i++){
291 if(i->time >= timeout)
296 bool ReliablePacketBuffer::anyTotaltimeReached(float timeout)
298 core::list<BufferedPacket>::Iterator i;
300 for(; i != m_list.end(); i++){
301 if(i->totaltime >= timeout)
307 core::list<BufferedPacket> ReliablePacketBuffer::getTimedOuts(float timeout)
309 core::list<BufferedPacket> timed_outs;
310 core::list<BufferedPacket>::Iterator i;
312 for(; i != m_list.end(); i++)
314 if(i->time >= timeout)
315 timed_outs.push_back(*i);
324 IncomingSplitBuffer::~IncomingSplitBuffer()
326 core::map<u16, IncomingSplitPacket*>::Iterator i;
327 i = m_buf.getIterator();
328 for(; i.atEnd() == false; i++)
330 delete i.getNode()->getValue();
334 This will throw a GotSplitPacketException when a full
335 split packet is constructed.
337 SharedBuffer<u8> IncomingSplitBuffer::insert(BufferedPacket &p, bool reliable)
339 u32 headersize = BASE_HEADER_SIZE + 7;
340 assert(p.data.getSize() >= headersize);
341 u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
342 assert(type == TYPE_SPLIT);
343 u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
344 u16 chunk_count = readU16(&p.data[BASE_HEADER_SIZE+3]);
345 u16 chunk_num = readU16(&p.data[BASE_HEADER_SIZE+5]);
347 // Add if doesn't exist
348 if(m_buf.find(seqnum) == NULL)
350 IncomingSplitPacket *sp = new IncomingSplitPacket();
351 sp->chunk_count = chunk_count;
352 sp->reliable = reliable;
356 IncomingSplitPacket *sp = m_buf[seqnum];
358 // TODO: These errors should be thrown or something? Dunno.
359 if(chunk_count != sp->chunk_count)
360 derr_con<<"Connection: WARNING: chunk_count="<<chunk_count
361 <<" != sp->chunk_count="<<sp->chunk_count
363 if(reliable != sp->reliable)
364 derr_con<<"Connection: WARNING: reliable="<<reliable
365 <<" != sp->reliable="<<sp->reliable
368 // If chunk already exists, ignore it.
369 // Sometimes two identical packets may arrive when there is network
370 // lag and the server re-sends stuff.
371 if(sp->chunks.find(chunk_num) != NULL)
372 return SharedBuffer<u8>();
374 // Cut chunk data out of packet
375 u32 chunkdatasize = p.data.getSize() - headersize;
376 SharedBuffer<u8> chunkdata(chunkdatasize);
377 memcpy(*chunkdata, &(p.data[headersize]), chunkdatasize);
379 // Set chunk data in buffer
380 sp->chunks[chunk_num] = chunkdata;
382 // If not all chunks are received, return empty buffer
383 if(sp->allReceived() == false)
384 return SharedBuffer<u8>();
386 // Calculate total size
388 core::map<u16, SharedBuffer<u8> >::Iterator i;
389 i = sp->chunks.getIterator();
390 for(; i.atEnd() == false; i++)
392 totalsize += i.getNode()->getValue().getSize();
395 SharedBuffer<u8> fulldata(totalsize);
397 // Copy chunks to data buffer
399 for(u32 chunk_i=0; chunk_i<sp->chunk_count;
402 SharedBuffer<u8> buf = sp->chunks[chunk_i];
403 u16 chunkdatasize = buf.getSize();
404 memcpy(&fulldata[start], *buf, chunkdatasize);
405 start += chunkdatasize;;
408 // Remove sp from buffer
409 m_buf.remove(seqnum);
414 void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout)
416 core::list<u16> remove_queue;
417 core::map<u16, IncomingSplitPacket*>::Iterator i;
418 i = m_buf.getIterator();
419 for(; i.atEnd() == false; i++)
421 IncomingSplitPacket *p = i.getNode()->getValue();
422 // Reliable ones are not removed by timeout
423 if(p->reliable == true)
426 if(p->time >= timeout)
427 remove_queue.push_back(i.getNode()->getKey());
429 core::list<u16>::Iterator j;
430 j = remove_queue.begin();
431 for(; j != remove_queue.end(); j++)
433 dout_con<<"NOTE: Removing timed out unreliable split packet"
446 next_outgoing_seqnum = SEQNUM_INITIAL;
447 next_incoming_seqnum = SEQNUM_INITIAL;
448 next_outgoing_split_seqnum = SEQNUM_INITIAL;
458 Peer::Peer(u16 a_id, Address a_address):
461 timeout_counter(0.0),
465 has_sent_with_id(false),
467 m_max_packets_per_second(10),
476 void Peer::reportRTT(float rtt)
480 if(m_max_packets_per_second < 400)
481 m_max_packets_per_second += 10;
482 } else if(rtt < 0.2){
483 if(m_max_packets_per_second < 100)
484 m_max_packets_per_second += 2;
486 m_max_packets_per_second *= 0.8;
487 if(m_max_packets_per_second < 10)
488 m_max_packets_per_second = 10;
494 else if(avg_rtt < 0.0)
497 avg_rtt = rtt * 0.1 + avg_rtt * 0.9;
499 // Calculate resend_timeout
501 /*int reliable_count = 0;
502 for(int i=0; i<CHANNEL_COUNT; i++)
504 reliable_count += channels[i].outgoing_reliables.size();
506 float timeout = avg_rtt * RESEND_TIMEOUT_FACTOR
507 * ((float)reliable_count * 1);*/
509 float timeout = avg_rtt * RESEND_TIMEOUT_FACTOR;
510 if(timeout < RESEND_TIMEOUT_MIN)
511 timeout = RESEND_TIMEOUT_MIN;
512 if(timeout > RESEND_TIMEOUT_MAX)
513 timeout = RESEND_TIMEOUT_MAX;
514 resend_timeout = timeout;
521 Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout):
522 m_protocol_id(protocol_id),
523 m_max_packet_size(max_packet_size),
526 m_bc_peerhandler(NULL),
527 m_bc_receive_timeout(0),
530 m_socket.setTimeoutMs(5);
535 Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout,
536 PeerHandler *peerhandler):
537 m_protocol_id(protocol_id),
538 m_max_packet_size(max_packet_size),
541 m_bc_peerhandler(peerhandler),
542 m_bc_receive_timeout(0),
545 m_socket.setTimeoutMs(5);
551 Connection::~Connection()
555 for(core::map<u16, Peer*>::Iterator
556 j = m_peers.getIterator();
557 j.atEnd() == false; j++)
559 Peer *peer = j.getNode()->getValue();
566 void * Connection::Thread()
569 log_register_thread("Connection");
571 dout_con<<"Connection thread started"<<std::endl;
573 u32 curtime = porting::getTimeMs();
574 u32 lasttime = curtime;
578 BEGIN_DEBUG_EXCEPTION_HANDLER
581 curtime = porting::getTimeMs();
582 float dtime = (float)(curtime - lasttime) / 1000.;
590 while(m_command_queue.size() != 0){
591 ConnectionCommand c = m_command_queue.pop_front();
599 END_DEBUG_EXCEPTION_HANDLER(derr_con);
605 void Connection::putEvent(ConnectionEvent &e)
607 assert(e.type != CONNEVENT_NONE);
608 m_event_queue.push_back(e);
611 void Connection::processCommand(ConnectionCommand &c)
615 dout_con<<getDesc()<<" processing CONNCMD_NONE"<<std::endl;
618 dout_con<<getDesc()<<" processing CONNCMD_SERVE port="
622 case CONNCMD_CONNECT:
623 dout_con<<getDesc()<<" processing CONNCMD_CONNECT"<<std::endl;
626 case CONNCMD_DISCONNECT:
627 dout_con<<getDesc()<<" processing CONNCMD_DISCONNECT"<<std::endl;
631 dout_con<<getDesc()<<" processing CONNCMD_SEND"<<std::endl;
632 send(c.peer_id, c.channelnum, c.data, c.reliable);
634 case CONNCMD_SEND_TO_ALL:
635 dout_con<<getDesc()<<" processing CONNCMD_SEND_TO_ALL"<<std::endl;
636 sendToAll(c.channelnum, c.data, c.reliable);
638 case CONNCMD_DELETE_PEER:
639 dout_con<<getDesc()<<" processing CONNCMD_DELETE_PEER"<<std::endl;
640 deletePeer(c.peer_id, false);
645 void Connection::send(float dtime)
647 for(core::map<u16, Peer*>::Iterator
648 j = m_peers.getIterator();
649 j.atEnd() == false; j++)
651 Peer *peer = j.getNode()->getValue();
652 peer->m_sendtime_accu += dtime;
653 peer->m_num_sent = 0;
654 peer->m_max_num_sent = peer->m_sendtime_accu *
655 peer->m_max_packets_per_second;
657 Queue<OutgoingPacket> postponed_packets;
658 while(m_outgoing_queue.size() != 0){
659 OutgoingPacket packet = m_outgoing_queue.pop_front();
660 Peer *peer = getPeerNoEx(packet.peer_id);
663 if(peer->channels[packet.channelnum].outgoing_reliables.size() >= 5){
664 postponed_packets.push_back(packet);
665 } else if(peer->m_num_sent < peer->m_max_num_sent){
666 rawSendAsPacket(packet.peer_id, packet.channelnum,
667 packet.data, packet.reliable);
670 postponed_packets.push_back(packet);
673 while(postponed_packets.size() != 0){
674 m_outgoing_queue.push_back(postponed_packets.pop_front());
676 for(core::map<u16, Peer*>::Iterator
677 j = m_peers.getIterator();
678 j.atEnd() == false; j++)
680 Peer *peer = j.getNode()->getValue();
681 peer->m_sendtime_accu -= (float)peer->m_num_sent /
682 peer->m_max_packets_per_second;
683 if(peer->m_sendtime_accu > 10. / peer->m_max_packets_per_second)
684 peer->m_sendtime_accu = 10. / peer->m_max_packets_per_second;
688 // Receive packets from the network and buffers and create ConnectionEvents
689 void Connection::receive()
691 u32 datasize = m_max_packet_size * 2; // Double it just to be safe
692 // TODO: We can not know how many layers of header there are.
693 // For now, just assume there are no other than the base headers.
694 u32 packet_maxsize = datasize + BASE_HEADER_SIZE;
695 SharedBuffer<u8> packetdata(packet_maxsize);
697 bool single_wait_done = false;
702 /* Check if some buffer has relevant data */
705 SharedBuffer<u8> resultdata;
706 bool got = getFromBuffers(peer_id, resultdata);
709 e.dataReceived(peer_id, resultdata);
715 if(single_wait_done){
716 if(m_socket.WaitData(0) == false)
720 single_wait_done = true;
723 s32 received_size = m_socket.Receive(sender, *packetdata, packet_maxsize);
725 if(received_size < 0)
727 if(received_size < BASE_HEADER_SIZE)
729 if(readU32(&packetdata[0]) != m_protocol_id)
732 u16 peer_id = readPeerId(*packetdata);
733 u8 channelnum = readChannel(*packetdata);
734 if(channelnum > CHANNEL_COUNT-1){
736 derr_con<<"Receive(): Invalid channel "<<channelnum<<std::endl;
737 throw InvalidIncomingDataException("Channel doesn't exist");
740 if(peer_id == PEER_ID_INEXISTENT)
743 Somebody is trying to send stuff to us with no peer id.
745 Check if the same address and port was added to our peer
747 Allow only entries that have has_sent_with_id==false.
750 core::map<u16, Peer*>::Iterator j;
751 j = m_peers.getIterator();
752 for(; j.atEnd() == false; j++)
754 Peer *peer = j.getNode()->getValue();
755 if(peer->has_sent_with_id)
757 if(peer->address == sender)
762 If no peer was found with the same address and port,
763 we shall assume it is a new peer and create an entry.
767 // Pass on to adding the peer
769 // Else: A peer was found.
772 Peer *peer = j.getNode()->getValue();
775 derr_con<<"WARNING: Assuming unknown peer to be "
776 <<"peer_id="<<peer_id<<std::endl;
781 The peer was not found in our lists. Add it.
783 if(peer_id == PEER_ID_INEXISTENT)
785 // Somebody wants to make a new connection
787 // Get a unique peer id (2 or higher)
790 Find an unused peer id
792 bool out_of_ids = false;
796 if(m_peers.find(peer_id_new) == NULL)
798 // Check for overflow
799 if(peer_id_new == 65535){
806 errorstream<<getDesc()<<" ran out of peer ids"<<std::endl;
811 dout_con<<"Receive(): Got a packet with peer_id=PEER_ID_INEXISTENT,"
812 " giving peer_id="<<peer_id_new<<std::endl;
815 Peer *peer = new Peer(peer_id_new, sender);
816 m_peers.insert(peer->id, peer);
818 // Create peer addition event
820 e.peerAdded(peer_id_new, sender);
823 // Create CONTROL packet to tell the peer id to the new peer.
824 SharedBuffer<u8> reply(4);
825 writeU8(&reply[0], TYPE_CONTROL);
826 writeU8(&reply[1], CONTROLTYPE_SET_PEER_ID);
827 writeU16(&reply[2], peer_id_new);
828 sendAsPacket(peer_id_new, 0, reply, true);
830 // We're now talking to a valid peer_id
831 peer_id = peer_id_new;
833 // Go on and process whatever it sent
836 core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
841 // This means that the peer id of the sender is not PEER_ID_INEXISTENT
842 // and it is invalid.
844 derr_con<<"Receive(): Peer not found"<<std::endl;
845 throw InvalidIncomingDataException("Peer not found (possible timeout)");
848 Peer *peer = node->getValue();
850 // Validate peer address
851 if(peer->address != sender)
854 derr_con<<"Peer "<<peer_id<<" sending from different address."
855 " Ignoring."<<std::endl;
859 peer->timeout_counter = 0.0;
861 Channel *channel = &(peer->channels[channelnum]);
863 // Throw the received packet to channel->processPacket()
865 // Make a new SharedBuffer from the data without the base headers
866 SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE);
867 memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE],
868 strippeddata.getSize());
871 // Process it (the result is some data with no headers made by us)
872 SharedBuffer<u8> resultdata = processPacket
873 (channel, strippeddata, peer_id, channelnum, false);
876 dout_con<<"ProcessPacket returned data of size "
877 <<resultdata.getSize()<<std::endl;
880 e.dataReceived(peer_id, resultdata);
883 }catch(ProcessedSilentlyException &e){
885 }catch(InvalidIncomingDataException &e){
887 catch(ProcessedSilentlyException &e){
892 void Connection::runTimeouts(float dtime)
894 core::list<u16> timeouted_peers;
895 core::map<u16, Peer*>::Iterator j;
896 j = m_peers.getIterator();
897 for(; j.atEnd() == false; j++)
899 Peer *peer = j.getNode()->getValue();
904 peer->timeout_counter += dtime;
905 if(peer->timeout_counter > m_timeout)
908 derr_con<<"RunTimeouts(): Peer "<<peer->id
910 <<" (source=peer->timeout_counter)"
912 // Add peer to the list
913 timeouted_peers.push_back(peer->id);
914 // Don't bother going through the buffers of this one
918 float resend_timeout = peer->resend_timeout;
919 for(u16 i=0; i<CHANNEL_COUNT; i++)
921 core::list<BufferedPacket> timed_outs;
922 core::list<BufferedPacket>::Iterator j;
924 Channel *channel = &peer->channels[i];
926 // Remove timed out incomplete unreliable split packets
927 channel->incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout);
929 // Increment reliable packet times
930 channel->outgoing_reliables.incrementTimeouts(dtime);
932 // Check reliable packet total times, remove peer if
934 if(channel->outgoing_reliables.anyTotaltimeReached(m_timeout))
937 derr_con<<"RunTimeouts(): Peer "<<peer->id
939 <<" (source=reliable packet totaltime)"
941 // Add peer to the to-be-removed list
942 timeouted_peers.push_back(peer->id);
946 // Re-send timed out outgoing reliables
948 timed_outs = channel->
949 outgoing_reliables.getTimedOuts(resend_timeout);
951 channel->outgoing_reliables.resetTimedOuts(resend_timeout);
953 j = timed_outs.begin();
954 for(; j != timed_outs.end(); j++)
956 u16 peer_id = readPeerId(*(j->data));
957 u8 channel = readChannel(*(j->data));
958 u16 seqnum = readU16(&(j->data[BASE_HEADER_SIZE+1]));
961 derr_con<<"RE-SENDING timed-out RELIABLE to ";
962 j->address.print(&derr_con);
963 derr_con<<"(t/o="<<resend_timeout<<"): "
964 <<"from_peer_id="<<peer_id
965 <<", channel="<<((int)channel&0xff)
966 <<", seqnum="<<seqnum
971 // Enlarge avg_rtt and resend_timeout:
972 // The rtt will be at least the timeout.
973 // NOTE: This won't affect the timeout of the next
974 // checked channel because it was cached.
975 peer->reportRTT(resend_timeout);
982 peer->ping_timer += dtime;
983 if(peer->ping_timer >= 5.0)
985 // Create and send PING packet
986 SharedBuffer<u8> data(2);
987 writeU8(&data[0], TYPE_CONTROL);
988 writeU8(&data[1], CONTROLTYPE_PING);
989 rawSendAsPacket(peer->id, 0, data, true);
991 peer->ping_timer = 0.0;
998 // Remove timed out peers
999 core::list<u16>::Iterator i = timeouted_peers.begin();
1000 for(; i != timeouted_peers.end(); i++)
1002 PrintInfo(derr_con);
1003 derr_con<<"RunTimeouts(): Removing peer "<<(*i)<<std::endl;
1004 deletePeer(*i, true);
1008 void Connection::serve(u16 port)
1010 dout_con<<getDesc()<<" serving at port "<<port<<std::endl;
1012 m_socket.Bind(port);
1013 m_peer_id = PEER_ID_SERVER;
1015 catch(SocketException &e){
1023 void Connection::connect(Address address)
1025 dout_con<<getDesc()<<" connecting to "<<address.serializeString()
1026 <<":"<<address.getPort()<<std::endl;
1028 core::map<u16, Peer*>::Node *node = m_peers.find(PEER_ID_SERVER);
1030 throw ConnectionException("Already connected to a server");
1033 Peer *peer = new Peer(PEER_ID_SERVER, address);
1034 m_peers.insert(peer->id, peer);
1038 e.peerAdded(peer->id, peer->address);
1043 // Send a dummy packet to server with peer_id = PEER_ID_INEXISTENT
1044 m_peer_id = PEER_ID_INEXISTENT;
1045 SharedBuffer<u8> data(0);
1046 Send(PEER_ID_SERVER, 0, data, true);
1049 void Connection::disconnect()
1051 dout_con<<getDesc()<<" disconnecting"<<std::endl;
1053 // Create and send DISCO packet
1054 SharedBuffer<u8> data(2);
1055 writeU8(&data[0], TYPE_CONTROL);
1056 writeU8(&data[1], CONTROLTYPE_DISCO);
1059 core::map<u16, Peer*>::Iterator j;
1060 j = m_peers.getIterator();
1061 for(; j.atEnd() == false; j++)
1063 Peer *peer = j.getNode()->getValue();
1064 rawSendAsPacket(peer->id, 0, data, false);
1068 void Connection::sendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable)
1070 core::map<u16, Peer*>::Iterator j;
1071 j = m_peers.getIterator();
1072 for(; j.atEnd() == false; j++)
1074 Peer *peer = j.getNode()->getValue();
1075 send(peer->id, channelnum, data, reliable);
1079 void Connection::send(u16 peer_id, u8 channelnum,
1080 SharedBuffer<u8> data, bool reliable)
1082 dout_con<<getDesc()<<" sending to peer_id="<<peer_id<<std::endl;
1084 assert(channelnum < CHANNEL_COUNT);
1086 Peer *peer = getPeerNoEx(peer_id);
1089 Channel *channel = &(peer->channels[channelnum]);
1091 u32 chunksize_max = m_max_packet_size - BASE_HEADER_SIZE;
1093 chunksize_max -= RELIABLE_HEADER_SIZE;
1095 core::list<SharedBuffer<u8> > originals;
1096 originals = makeAutoSplitPacket(data, chunksize_max,
1097 channel->next_outgoing_split_seqnum);
1099 core::list<SharedBuffer<u8> >::Iterator i;
1100 i = originals.begin();
1101 for(; i != originals.end(); i++)
1103 SharedBuffer<u8> original = *i;
1105 sendAsPacket(peer_id, channelnum, original, reliable);
1109 void Connection::sendAsPacket(u16 peer_id, u8 channelnum,
1110 SharedBuffer<u8> data, bool reliable)
1112 OutgoingPacket packet(peer_id, channelnum, data, reliable);
1113 m_outgoing_queue.push_back(packet);
1116 void Connection::rawSendAsPacket(u16 peer_id, u8 channelnum,
1117 SharedBuffer<u8> data, bool reliable)
1119 Peer *peer = getPeerNoEx(peer_id);
1122 Channel *channel = &(peer->channels[channelnum]);
1126 u16 seqnum = channel->next_outgoing_seqnum;
1127 channel->next_outgoing_seqnum++;
1129 SharedBuffer<u8> reliable = makeReliablePacket(data, seqnum);
1131 // Add base headers and make a packet
1132 BufferedPacket p = makePacket(peer->address, reliable,
1133 m_protocol_id, m_peer_id, channelnum);
1136 // Buffer the packet
1137 channel->outgoing_reliables.insert(p);
1139 catch(AlreadyExistsException &e)
1141 PrintInfo(derr_con);
1142 derr_con<<"WARNING: Going to send a reliable packet "
1143 "seqnum="<<seqnum<<" that is already "
1144 "in outgoing buffer"<<std::endl;
1153 // Add base headers and make a packet
1154 BufferedPacket p = makePacket(peer->address, data,
1155 m_protocol_id, m_peer_id, channelnum);
1162 void Connection::rawSend(const BufferedPacket &packet)
1165 m_socket.Send(packet.address, *packet.data, packet.data.getSize());
1166 } catch(SendFailedException &e){
1167 derr_con<<"Connection::rawSend(): SendFailedException: "
1168 <<packet.address.serializeString()<<std::endl;
1172 Peer* Connection::getPeer(u16 peer_id)
1174 core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
1177 throw PeerNotFoundException("GetPeer: Peer not found (possible timeout)");
1181 assert(node->getValue()->id == peer_id);
1183 return node->getValue();
1186 Peer* Connection::getPeerNoEx(u16 peer_id)
1188 core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
1195 assert(node->getValue()->id == peer_id);
1197 return node->getValue();
1200 core::list<Peer*> Connection::getPeers()
1202 core::list<Peer*> list;
1203 core::map<u16, Peer*>::Iterator j;
1204 j = m_peers.getIterator();
1205 for(; j.atEnd() == false; j++)
1207 Peer *peer = j.getNode()->getValue();
1208 list.push_back(peer);
1213 bool Connection::getFromBuffers(u16 &peer_id, SharedBuffer<u8> &dst)
1215 core::map<u16, Peer*>::Iterator j;
1216 j = m_peers.getIterator();
1217 for(; j.atEnd() == false; j++)
1219 Peer *peer = j.getNode()->getValue();
1220 for(u16 i=0; i<CHANNEL_COUNT; i++)
1222 Channel *channel = &peer->channels[i];
1223 SharedBuffer<u8> resultdata;
1224 bool got = checkIncomingBuffers(channel, peer_id, resultdata);
1234 bool Connection::checkIncomingBuffers(Channel *channel, u16 &peer_id,
1235 SharedBuffer<u8> &dst)
1237 u16 firstseqnum = 0;
1238 // Clear old packets from start of buffer
1241 firstseqnum = channel->incoming_reliables.getFirstSeqnum();
1242 if(seqnum_higher(channel->next_incoming_seqnum, firstseqnum))
1243 channel->incoming_reliables.popFirst();
1247 // This happens if all packets are old
1248 }catch(con::NotFoundException)
1251 if(channel->incoming_reliables.empty() == false)
1253 if(firstseqnum == channel->next_incoming_seqnum)
1255 BufferedPacket p = channel->incoming_reliables.popFirst();
1257 peer_id = readPeerId(*p.data);
1258 u8 channelnum = readChannel(*p.data);
1259 u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
1262 dout_con<<"UNBUFFERING TYPE_RELIABLE"
1263 <<" seqnum="<<seqnum
1264 <<" peer_id="<<peer_id
1265 <<" channel="<<((int)channelnum&0xff)
1268 channel->next_incoming_seqnum++;
1270 u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
1271 // Get out the inside packet and re-process it
1272 SharedBuffer<u8> payload(p.data.getSize() - headers_size);
1273 memcpy(*payload, &p.data[headers_size], payload.getSize());
1275 dst = processPacket(channel, payload, peer_id, channelnum, true);
1282 SharedBuffer<u8> Connection::processPacket(Channel *channel,
1283 SharedBuffer<u8> packetdata, u16 peer_id,
1284 u8 channelnum, bool reliable)
1286 IndentationRaiser iraiser(&(m_indentation));
1288 if(packetdata.getSize() < 1)
1289 throw InvalidIncomingDataException("packetdata.getSize() < 1");
1291 u8 type = readU8(&packetdata[0]);
1293 if(type == TYPE_CONTROL)
1295 if(packetdata.getSize() < 2)
1296 throw InvalidIncomingDataException("packetdata.getSize() < 2");
1298 u8 controltype = readU8(&packetdata[1]);
1300 if(controltype == CONTROLTYPE_ACK)
1302 if(packetdata.getSize() < 4)
1303 throw InvalidIncomingDataException
1304 ("packetdata.getSize() < 4 (ACK header size)");
1306 u16 seqnum = readU16(&packetdata[2]);
1308 dout_con<<"Got CONTROLTYPE_ACK: channelnum="
1309 <<((int)channelnum&0xff)<<", peer_id="<<peer_id
1310 <<", seqnum="<<seqnum<<std::endl;
1313 BufferedPacket p = channel->outgoing_reliables.popSeqnum(seqnum);
1314 // Get round trip time
1315 float rtt = p.totaltime;
1317 // Let peer calculate stuff according to it
1318 // (avg_rtt and resend_timeout)
1319 Peer *peer = getPeer(peer_id);
1320 peer->reportRTT(rtt);
1322 //PrintInfo(dout_con);
1323 //dout_con<<"RTT = "<<rtt<<std::endl;
1325 /*dout_con<<"OUTGOING: ";
1327 channel->outgoing_reliables.print();
1328 dout_con<<std::endl;*/
1330 catch(NotFoundException &e){
1331 PrintInfo(derr_con);
1332 derr_con<<"WARNING: ACKed packet not "
1337 throw ProcessedSilentlyException("Got an ACK");
1339 else if(controltype == CONTROLTYPE_SET_PEER_ID)
1341 if(packetdata.getSize() < 4)
1342 throw InvalidIncomingDataException
1343 ("packetdata.getSize() < 4 (SET_PEER_ID header size)");
1344 u16 peer_id_new = readU16(&packetdata[2]);
1346 dout_con<<"Got new peer id: "<<peer_id_new<<"... "<<std::endl;
1348 if(GetPeerID() != PEER_ID_INEXISTENT)
1350 PrintInfo(derr_con);
1351 derr_con<<"WARNING: Not changing"
1352 " existing peer id."<<std::endl;
1356 dout_con<<"changing."<<std::endl;
1357 SetPeerID(peer_id_new);
1359 throw ProcessedSilentlyException("Got a SET_PEER_ID");
1361 else if(controltype == CONTROLTYPE_PING)
1363 // Just ignore it, the incoming data already reset
1364 // the timeout counter
1366 dout_con<<"PING"<<std::endl;
1367 throw ProcessedSilentlyException("Got a PING");
1369 else if(controltype == CONTROLTYPE_DISCO)
1371 // Just ignore it, the incoming data already reset
1372 // the timeout counter
1374 dout_con<<"DISCO: Removing peer "<<(peer_id)<<std::endl;
1376 if(deletePeer(peer_id, false) == false)
1378 PrintInfo(derr_con);
1379 derr_con<<"DISCO: Peer not found"<<std::endl;
1382 throw ProcessedSilentlyException("Got a DISCO");
1385 PrintInfo(derr_con);
1386 derr_con<<"INVALID TYPE_CONTROL: invalid controltype="
1387 <<((int)controltype&0xff)<<std::endl;
1388 throw InvalidIncomingDataException("Invalid control type");
1391 else if(type == TYPE_ORIGINAL)
1393 if(packetdata.getSize() < ORIGINAL_HEADER_SIZE)
1394 throw InvalidIncomingDataException
1395 ("packetdata.getSize() < ORIGINAL_HEADER_SIZE");
1397 dout_con<<"RETURNING TYPE_ORIGINAL to user"
1399 // Get the inside packet out and return it
1400 SharedBuffer<u8> payload(packetdata.getSize() - ORIGINAL_HEADER_SIZE);
1401 memcpy(*payload, &packetdata[ORIGINAL_HEADER_SIZE], payload.getSize());
1404 else if(type == TYPE_SPLIT)
1406 // We have to create a packet again for buffering
1407 // This isn't actually too bad an idea.
1408 BufferedPacket packet = makePacket(
1409 getPeer(peer_id)->address,
1414 // Buffer the packet
1415 SharedBuffer<u8> data = channel->incoming_splits.insert(packet, reliable);
1416 if(data.getSize() != 0)
1419 dout_con<<"RETURNING TYPE_SPLIT: Constructed full data, "
1420 <<"size="<<data.getSize()<<std::endl;
1424 dout_con<<"BUFFERED TYPE_SPLIT"<<std::endl;
1425 throw ProcessedSilentlyException("Buffered a split packet chunk");
1427 else if(type == TYPE_RELIABLE)
1429 // Recursive reliable packets not allowed
1430 assert(reliable == false);
1432 if(packetdata.getSize() < RELIABLE_HEADER_SIZE)
1433 throw InvalidIncomingDataException
1434 ("packetdata.getSize() < RELIABLE_HEADER_SIZE");
1436 u16 seqnum = readU16(&packetdata[1]);
1438 bool is_future_packet = seqnum_higher(seqnum, channel->next_incoming_seqnum);
1439 bool is_old_packet = seqnum_higher(channel->next_incoming_seqnum, seqnum);
1442 if(is_future_packet)
1443 dout_con<<"BUFFERING";
1444 else if(is_old_packet)
1448 dout_con<<" TYPE_RELIABLE seqnum="<<seqnum
1449 <<" next="<<channel->next_incoming_seqnum;
1450 dout_con<<" [sending CONTROLTYPE_ACK"
1451 " to peer_id="<<peer_id<<"]";
1452 dout_con<<std::endl;
1455 //assert(channel->incoming_reliables.size() < 100);
1457 // Send a CONTROLTYPE_ACK
1458 SharedBuffer<u8> reply(4);
1459 writeU8(&reply[0], TYPE_CONTROL);
1460 writeU8(&reply[1], CONTROLTYPE_ACK);
1461 writeU16(&reply[2], seqnum);
1462 rawSendAsPacket(peer_id, channelnum, reply, false);
1464 //if(seqnum_higher(seqnum, channel->next_incoming_seqnum))
1465 if(is_future_packet)
1468 dout_con<<"Buffering reliable packet (seqnum="
1469 <<seqnum<<")"<<std::endl;*/
1471 // This one comes later, buffer it.
1472 // Actually we have to make a packet to buffer one.
1473 // Well, we have all the ingredients, so just do it.
1474 BufferedPacket packet = makePacket(
1475 getPeer(peer_id)->address,
1481 channel->incoming_reliables.insert(packet);
1484 dout_con<<"INCOMING: ";
1485 channel->incoming_reliables.print();
1486 dout_con<<std::endl;*/
1488 catch(AlreadyExistsException &e)
1492 throw ProcessedSilentlyException("Buffered future reliable packet");
1494 //else if(seqnum_higher(channel->next_incoming_seqnum, seqnum))
1495 else if(is_old_packet)
1497 // An old packet, dump it
1498 throw InvalidIncomingDataException("Got an old reliable packet");
1501 channel->next_incoming_seqnum++;
1503 // Get out the inside packet and re-process it
1504 SharedBuffer<u8> payload(packetdata.getSize() - RELIABLE_HEADER_SIZE);
1505 memcpy(*payload, &packetdata[RELIABLE_HEADER_SIZE], payload.getSize());
1507 return processPacket(channel, payload, peer_id, channelnum, true);
1511 PrintInfo(derr_con);
1512 derr_con<<"Got invalid type="<<((int)type&0xff)<<std::endl;
1513 throw InvalidIncomingDataException("Invalid packet type");
1516 // We should never get here.
1517 // If you get here, add an exception or a return to some of the
1518 // above conditionals.
1520 throw BaseException("Error in Channel::ProcessPacket()");
1523 bool Connection::deletePeer(u16 peer_id, bool timeout)
1525 if(m_peers.find(peer_id) == NULL)
1528 Peer *peer = m_peers[peer_id];
1532 e.peerRemoved(peer_id, timeout, peer->address);
1535 delete m_peers[peer_id];
1536 m_peers.remove(peer_id);
1542 ConnectionEvent Connection::getEvent()
1544 if(m_event_queue.size() == 0){
1546 e.type = CONNEVENT_NONE;
1549 return m_event_queue.pop_front();
1552 ConnectionEvent Connection::waitEvent(u32 timeout_ms)
1555 return m_event_queue.pop_front(timeout_ms);
1556 } catch(ItemNotFoundException &ex){
1558 e.type = CONNEVENT_NONE;
1563 void Connection::putCommand(ConnectionCommand &c)
1565 m_command_queue.push_back(c);
1568 void Connection::Serve(unsigned short port)
1570 ConnectionCommand c;
1575 void Connection::Connect(Address address)
1577 ConnectionCommand c;
1582 bool Connection::Connected()
1584 JMutexAutoLock peerlock(m_peers_mutex);
1586 if(m_peers.size() != 1)
1589 core::map<u16, Peer*>::Node *node = m_peers.find(PEER_ID_SERVER);
1593 if(m_peer_id == PEER_ID_INEXISTENT)
1599 void Connection::Disconnect()
1601 ConnectionCommand c;
1606 u32 Connection::Receive(u16 &peer_id, SharedBuffer<u8> &data)
1609 ConnectionEvent e = waitEvent(m_bc_receive_timeout);
1610 if(e.type != CONNEVENT_NONE)
1611 dout_con<<getDesc()<<": Receive: got event: "
1612 <<e.describe()<<std::endl;
1614 case CONNEVENT_NONE:
1615 throw NoIncomingDataException("No incoming data");
1616 case CONNEVENT_DATA_RECEIVED:
1617 peer_id = e.peer_id;
1618 data = SharedBuffer<u8>(e.data);
1619 return e.data.getSize();
1620 case CONNEVENT_PEER_ADDED: {
1621 Peer tmp(e.peer_id, e.address);
1622 if(m_bc_peerhandler)
1623 m_bc_peerhandler->peerAdded(&tmp);
1625 case CONNEVENT_PEER_REMOVED: {
1626 Peer tmp(e.peer_id, e.address);
1627 if(m_bc_peerhandler)
1628 m_bc_peerhandler->deletingPeer(&tmp, e.timeout);
1630 case CONNEVENT_BIND_FAILED:
1631 throw ConnectionBindFailed("Failed to bind socket "
1632 "(port already in use?)");
1635 throw NoIncomingDataException("No incoming data");
1638 void Connection::SendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable)
1640 assert(channelnum < CHANNEL_COUNT);
1642 ConnectionCommand c;
1643 c.sendToAll(channelnum, data, reliable);
1647 void Connection::Send(u16 peer_id, u8 channelnum,
1648 SharedBuffer<u8> data, bool reliable)
1650 assert(channelnum < CHANNEL_COUNT);
1652 ConnectionCommand c;
1653 c.send(peer_id, channelnum, data, reliable);
1657 void Connection::RunTimeouts(float dtime)
1662 Address Connection::GetPeerAddress(u16 peer_id)
1664 JMutexAutoLock peerlock(m_peers_mutex);
1665 return getPeer(peer_id)->address;
1668 float Connection::GetPeerAvgRTT(u16 peer_id)
1670 JMutexAutoLock peerlock(m_peers_mutex);
1671 return getPeer(peer_id)->avg_rtt;
1674 void Connection::DeletePeer(u16 peer_id)
1676 ConnectionCommand c;
1677 c.deletePeer(peer_id);
1681 void Connection::PrintInfo(std::ostream &out)
1683 out<<getDesc()<<": ";
1686 void Connection::PrintInfo()
1688 PrintInfo(dout_con);
1691 std::string Connection::getDesc()
1693 return std::string("con(")+itos(m_socket.GetHandle())+"/"+itos(m_peer_id)+")";