3 Copyright (C) 2013 celeron55, Perttu Ahola <celeron55@gmail.com>
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU Lesser General Public License as published by
7 the Free Software Foundation; either version 2.1 of the License, or
8 (at your option) any later version.
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU Lesser General Public License for more details.
15 You should have received a copy of the GNU Lesser General Public License along
16 with this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 #include "connection.h"
22 #include "serialization.h"
25 #include "util/serialize.h"
26 #include "util/numeric.h"
27 #include "util/string.h"
33 static u16 readPeerId(u8 *packetdata)
35 return readU16(&packetdata[4]);
37 static u8 readChannel(u8 *packetdata)
39 return readU8(&packetdata[6]);
42 BufferedPacket makePacket(Address &address, u8 *data, u32 datasize,
43 u32 protocol_id, u16 sender_peer_id, u8 channel)
45 u32 packet_size = datasize + BASE_HEADER_SIZE;
46 BufferedPacket p(packet_size);
49 writeU32(&p.data[0], protocol_id);
50 writeU16(&p.data[4], sender_peer_id);
51 writeU8(&p.data[6], channel);
53 memcpy(&p.data[BASE_HEADER_SIZE], data, datasize);
58 BufferedPacket makePacket(Address &address, SharedBuffer<u8> &data,
59 u32 protocol_id, u16 sender_peer_id, u8 channel)
61 return makePacket(address, *data, data.getSize(),
62 protocol_id, sender_peer_id, channel);
65 SharedBuffer<u8> makeOriginalPacket(
66 SharedBuffer<u8> data)
69 u32 packet_size = data.getSize() + header_size;
70 SharedBuffer<u8> b(packet_size);
72 writeU8(&b[0], TYPE_ORIGINAL);
74 memcpy(&b[header_size], *data, data.getSize());
79 std::list<SharedBuffer<u8> > makeSplitPacket(
80 SharedBuffer<u8> data,
84 // Chunk packets, containing the TYPE_SPLIT header
85 std::list<SharedBuffer<u8> > chunks;
87 u32 chunk_header_size = 7;
88 u32 maximum_data_size = chunksize_max - chunk_header_size;
94 end = start + maximum_data_size - 1;
95 if(end > data.getSize() - 1)
96 end = data.getSize() - 1;
98 u32 payload_size = end - start + 1;
99 u32 packet_size = chunk_header_size + payload_size;
101 SharedBuffer<u8> chunk(packet_size);
103 writeU8(&chunk[0], TYPE_SPLIT);
104 writeU16(&chunk[1], seqnum);
105 // [3] u16 chunk_count is written at next stage
106 writeU16(&chunk[5], chunk_num);
107 memcpy(&chunk[chunk_header_size], &data[start], payload_size);
109 chunks.push_back(chunk);
115 while(end != data.getSize() - 1);
117 for(std::list<SharedBuffer<u8> >::iterator i = chunks.begin();
118 i != chunks.end(); ++i)
121 writeU16(&((*i)[3]), chunk_count);
127 std::list<SharedBuffer<u8> > makeAutoSplitPacket(
128 SharedBuffer<u8> data,
132 u32 original_header_size = 1;
133 std::list<SharedBuffer<u8> > list;
134 if(data.getSize() + original_header_size > chunksize_max)
136 list = makeSplitPacket(data, chunksize_max, split_seqnum);
142 list.push_back(makeOriginalPacket(data));
147 SharedBuffer<u8> makeReliablePacket(
148 SharedBuffer<u8> data,
151 /*dstream<<"BEGIN SharedBuffer<u8> makeReliablePacket()"<<std::endl;
152 dstream<<"data.getSize()="<<data.getSize()<<", data[0]="
153 <<((unsigned int)data[0]&0xff)<<std::endl;*/
155 u32 packet_size = data.getSize() + header_size;
156 SharedBuffer<u8> b(packet_size);
158 writeU8(&b[0], TYPE_RELIABLE);
159 writeU16(&b[1], seqnum);
161 memcpy(&b[header_size], *data, data.getSize());
163 /*dstream<<"data.getSize()="<<data.getSize()<<", data[0]="
164 <<((unsigned int)data[0]&0xff)<<std::endl;*/
165 //dstream<<"END SharedBuffer<u8> makeReliablePacket()"<<std::endl;
173 ReliablePacketBuffer::ReliablePacketBuffer(): m_list_size(0) {}
175 void ReliablePacketBuffer::print()
177 for(std::list<BufferedPacket>::iterator i = m_list.begin();
181 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
185 bool ReliablePacketBuffer::empty()
187 return m_list.empty();
189 u32 ReliablePacketBuffer::size()
193 RPBSearchResult ReliablePacketBuffer::findPacket(u16 seqnum)
195 std::list<BufferedPacket>::iterator i = m_list.begin();
196 for(; i != m_list.end(); ++i)
198 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
199 /*dout_con<<"findPacket(): finding seqnum="<<seqnum
200 <<", comparing to s="<<s<<std::endl;*/
206 RPBSearchResult ReliablePacketBuffer::notFound()
210 u16 ReliablePacketBuffer::getFirstSeqnum()
213 throw NotFoundException("Buffer is empty");
214 BufferedPacket p = *m_list.begin();
215 return readU16(&p.data[BASE_HEADER_SIZE+1]);
217 BufferedPacket ReliablePacketBuffer::popFirst()
220 throw NotFoundException("Buffer is empty");
221 BufferedPacket p = *m_list.begin();
222 m_list.erase(m_list.begin());
226 BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum)
228 RPBSearchResult r = findPacket(seqnum);
230 dout_con<<"Not found"<<std::endl;
231 throw NotFoundException("seqnum not found in buffer");
233 BufferedPacket p = *r;
238 void ReliablePacketBuffer::insert(BufferedPacket &p)
240 assert(p.data.getSize() >= BASE_HEADER_SIZE+3);
241 u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
242 assert(type == TYPE_RELIABLE);
243 u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
246 // Find the right place for the packet and insert it there
248 // If list is empty, just add it
255 // Otherwise find the right place
256 std::list<BufferedPacket>::iterator i = m_list.begin();
257 // Find the first packet in the list which has a higher seqnum
258 for(; i != m_list.end(); ++i){
259 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
262 throw AlreadyExistsException("Same seqnum in list");
264 if(seqnum_higher(s, seqnum)){
268 // If we're at the end of the list, add the packet to the
270 if(i == m_list.end())
280 void ReliablePacketBuffer::incrementTimeouts(float dtime)
282 for(std::list<BufferedPacket>::iterator i = m_list.begin();
283 i != m_list.end(); ++i)
286 i->totaltime += dtime;
290 void ReliablePacketBuffer::resetTimedOuts(float timeout)
292 for(std::list<BufferedPacket>::iterator i = m_list.begin();
293 i != m_list.end(); ++i)
295 if(i->time >= timeout)
300 bool ReliablePacketBuffer::anyTotaltimeReached(float timeout)
302 for(std::list<BufferedPacket>::iterator i = m_list.begin();
303 i != m_list.end(); ++i)
305 if(i->totaltime >= timeout)
311 std::list<BufferedPacket> ReliablePacketBuffer::getTimedOuts(float timeout)
313 std::list<BufferedPacket> timed_outs;
314 for(std::list<BufferedPacket>::iterator i = m_list.begin();
315 i != m_list.end(); ++i)
317 if(i->time >= timeout)
318 timed_outs.push_back(*i);
327 IncomingSplitBuffer::~IncomingSplitBuffer()
329 for(std::map<u16, IncomingSplitPacket*>::iterator i = m_buf.begin();
330 i != m_buf.end(); ++i)
336 This will throw a GotSplitPacketException when a full
337 split packet is constructed.
339 SharedBuffer<u8> IncomingSplitBuffer::insert(BufferedPacket &p, bool reliable)
341 u32 headersize = BASE_HEADER_SIZE + 7;
342 assert(p.data.getSize() >= headersize);
343 u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
344 assert(type == TYPE_SPLIT);
345 u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
346 u16 chunk_count = readU16(&p.data[BASE_HEADER_SIZE+3]);
347 u16 chunk_num = readU16(&p.data[BASE_HEADER_SIZE+5]);
349 // Add if doesn't exist
350 if(m_buf.find(seqnum) == m_buf.end())
352 IncomingSplitPacket *sp = new IncomingSplitPacket();
353 sp->chunk_count = chunk_count;
354 sp->reliable = reliable;
358 IncomingSplitPacket *sp = m_buf[seqnum];
360 // TODO: These errors should be thrown or something? Dunno.
361 if(chunk_count != sp->chunk_count)
362 derr_con<<"Connection: WARNING: chunk_count="<<chunk_count
363 <<" != sp->chunk_count="<<sp->chunk_count
365 if(reliable != sp->reliable)
366 derr_con<<"Connection: WARNING: reliable="<<reliable
367 <<" != sp->reliable="<<sp->reliable
370 // If chunk already exists, ignore it.
371 // Sometimes two identical packets may arrive when there is network
372 // lag and the server re-sends stuff.
373 if(sp->chunks.find(chunk_num) != sp->chunks.end())
374 return SharedBuffer<u8>();
376 // Cut chunk data out of packet
377 u32 chunkdatasize = p.data.getSize() - headersize;
378 SharedBuffer<u8> chunkdata(chunkdatasize);
379 memcpy(*chunkdata, &(p.data[headersize]), chunkdatasize);
381 // Set chunk data in buffer
382 sp->chunks[chunk_num] = chunkdata;
384 // If not all chunks are received, return empty buffer
385 if(sp->allReceived() == false)
386 return SharedBuffer<u8>();
388 // Calculate total size
390 for(std::map<u16, SharedBuffer<u8> >::iterator i = sp->chunks.begin();
391 i != sp->chunks.end(); ++i)
393 totalsize += i->second.getSize();
396 SharedBuffer<u8> fulldata(totalsize);
398 // Copy chunks to data buffer
400 for(u32 chunk_i=0; chunk_i<sp->chunk_count;
403 SharedBuffer<u8> buf = sp->chunks[chunk_i];
404 u16 chunkdatasize = buf.getSize();
405 memcpy(&fulldata[start], *buf, chunkdatasize);
406 start += chunkdatasize;;
409 // Remove sp from buffer
415 void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout)
417 std::list<u16> remove_queue;
418 for(std::map<u16, IncomingSplitPacket*>::iterator i = m_buf.begin();
419 i != m_buf.end(); ++i)
421 IncomingSplitPacket *p = i->second;
422 // Reliable ones are not removed by timeout
423 if(p->reliable == true)
426 if(p->time >= timeout)
427 remove_queue.push_back(i->first);
429 for(std::list<u16>::iterator j = remove_queue.begin();
430 j != remove_queue.end(); ++j)
432 dout_con<<"NOTE: Removing timed out unreliable split packet"
445 next_outgoing_seqnum = SEQNUM_INITIAL;
446 next_incoming_seqnum = SEQNUM_INITIAL;
447 next_outgoing_split_seqnum = SEQNUM_INITIAL;
457 Peer::Peer(u16 a_id, Address a_address):
460 timeout_counter(0.0),
464 has_sent_with_id(false),
466 m_max_packets_per_second(10),
469 congestion_control_aim_rtt(0.2),
470 congestion_control_max_rate(400),
471 congestion_control_min_rate(10)
478 void Peer::reportRTT(float rtt)
482 if(m_max_packets_per_second < congestion_control_max_rate)
483 m_max_packets_per_second += 10;
484 } else if(rtt < congestion_control_aim_rtt){
485 if(m_max_packets_per_second < congestion_control_max_rate)
486 m_max_packets_per_second += 2;
488 m_max_packets_per_second *= 0.8;
489 if(m_max_packets_per_second < congestion_control_min_rate)
490 m_max_packets_per_second = congestion_control_min_rate;
496 else if(avg_rtt < 0.0)
499 avg_rtt = rtt * 0.1 + avg_rtt * 0.9;
501 // Calculate resend_timeout
503 /*int reliable_count = 0;
504 for(int i=0; i<CHANNEL_COUNT; i++)
506 reliable_count += channels[i].outgoing_reliables.size();
508 float timeout = avg_rtt * RESEND_TIMEOUT_FACTOR
509 * ((float)reliable_count * 1);*/
511 float timeout = avg_rtt * RESEND_TIMEOUT_FACTOR;
512 if(timeout < RESEND_TIMEOUT_MIN)
513 timeout = RESEND_TIMEOUT_MIN;
514 if(timeout > RESEND_TIMEOUT_MAX)
515 timeout = RESEND_TIMEOUT_MAX;
516 resend_timeout = timeout;
523 Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout,
525 m_protocol_id(protocol_id),
526 m_max_packet_size(max_packet_size),
530 m_bc_peerhandler(NULL),
531 m_bc_receive_timeout(0),
534 m_socket.setTimeoutMs(5);
539 Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout,
540 bool ipv6, PeerHandler *peerhandler):
541 m_protocol_id(protocol_id),
542 m_max_packet_size(max_packet_size),
546 m_bc_peerhandler(peerhandler),
547 m_bc_receive_timeout(0),
550 m_socket.setTimeoutMs(5);
556 Connection::~Connection()
560 for(std::map<u16, Peer*>::iterator
562 j != m_peers.end(); ++j)
570 void * Connection::Thread()
573 log_register_thread("Connection");
575 dout_con<<"Connection thread started"<<std::endl;
577 u32 curtime = porting::getTimeMs();
578 u32 lasttime = curtime;
582 BEGIN_DEBUG_EXCEPTION_HANDLER
585 curtime = porting::getTimeMs();
586 float dtime = (float)(curtime - lasttime) / 1000.;
594 while(!m_command_queue.empty()){
595 ConnectionCommand c = m_command_queue.pop_front();
603 END_DEBUG_EXCEPTION_HANDLER(derr_con);
609 void Connection::putEvent(ConnectionEvent &e)
611 assert(e.type != CONNEVENT_NONE);
612 m_event_queue.push_back(e);
615 void Connection::processCommand(ConnectionCommand &c)
619 dout_con<<getDesc()<<" processing CONNCMD_NONE"<<std::endl;
622 dout_con<<getDesc()<<" processing CONNCMD_SERVE port="
626 case CONNCMD_CONNECT:
627 dout_con<<getDesc()<<" processing CONNCMD_CONNECT"<<std::endl;
630 case CONNCMD_DISCONNECT:
631 dout_con<<getDesc()<<" processing CONNCMD_DISCONNECT"<<std::endl;
635 dout_con<<getDesc()<<" processing CONNCMD_SEND"<<std::endl;
636 send(c.peer_id, c.channelnum, c.data, c.reliable);
638 case CONNCMD_SEND_TO_ALL:
639 dout_con<<getDesc()<<" processing CONNCMD_SEND_TO_ALL"<<std::endl;
640 sendToAll(c.channelnum, c.data, c.reliable);
642 case CONNCMD_DELETE_PEER:
643 dout_con<<getDesc()<<" processing CONNCMD_DELETE_PEER"<<std::endl;
644 deletePeer(c.peer_id, false);
649 void Connection::send(float dtime)
651 for(std::map<u16, Peer*>::iterator
653 j != m_peers.end(); ++j)
655 Peer *peer = j->second;
656 peer->m_sendtime_accu += dtime;
657 peer->m_num_sent = 0;
658 peer->m_max_num_sent = peer->m_sendtime_accu *
659 peer->m_max_packets_per_second;
661 Queue<OutgoingPacket> postponed_packets;
662 while(!m_outgoing_queue.empty()){
663 OutgoingPacket packet = m_outgoing_queue.pop_front();
664 Peer *peer = getPeerNoEx(packet.peer_id);
667 if(peer->channels[packet.channelnum].outgoing_reliables.size() >= 5){
668 postponed_packets.push_back(packet);
669 } else if(peer->m_num_sent < peer->m_max_num_sent){
670 rawSendAsPacket(packet.peer_id, packet.channelnum,
671 packet.data, packet.reliable);
674 postponed_packets.push_back(packet);
677 while(!postponed_packets.empty()){
678 m_outgoing_queue.push_back(postponed_packets.pop_front());
680 for(std::map<u16, Peer*>::iterator
682 j != m_peers.end(); ++j)
684 Peer *peer = j->second;
685 peer->m_sendtime_accu -= (float)peer->m_num_sent /
686 peer->m_max_packets_per_second;
687 if(peer->m_sendtime_accu > 10. / peer->m_max_packets_per_second)
688 peer->m_sendtime_accu = 10. / peer->m_max_packets_per_second;
692 // Receive packets from the network and buffers and create ConnectionEvents
693 void Connection::receive()
695 u32 datasize = m_max_packet_size * 2; // Double it just to be safe
696 // TODO: We can not know how many layers of header there are.
697 // For now, just assume there are no other than the base headers.
698 u32 packet_maxsize = datasize + BASE_HEADER_SIZE;
699 SharedBuffer<u8> packetdata(packet_maxsize);
701 bool single_wait_done = false;
706 /* Check if some buffer has relevant data */
709 SharedBuffer<u8> resultdata;
710 bool got = getFromBuffers(peer_id, resultdata);
713 e.dataReceived(peer_id, resultdata);
719 if(single_wait_done){
720 if(m_socket.WaitData(0) == false)
724 single_wait_done = true;
727 s32 received_size = m_socket.Receive(sender, *packetdata, packet_maxsize);
729 if(received_size < 0)
731 if(received_size < BASE_HEADER_SIZE)
733 if(readU32(&packetdata[0]) != m_protocol_id)
736 u16 peer_id = readPeerId(*packetdata);
737 u8 channelnum = readChannel(*packetdata);
738 if(channelnum > CHANNEL_COUNT-1){
740 derr_con<<"Receive(): Invalid channel "<<channelnum<<std::endl;
741 throw InvalidIncomingDataException("Channel doesn't exist");
744 if(peer_id == PEER_ID_INEXISTENT)
747 Somebody is trying to send stuff to us with no peer id.
749 Check if the same address and port was added to our peer
751 Allow only entries that have has_sent_with_id==false.
754 std::map<u16, Peer*>::iterator j;
756 for(; j != m_peers.end(); ++j)
758 Peer *peer = j->second;
759 if(peer->has_sent_with_id)
761 if(peer->address == sender)
766 If no peer was found with the same address and port,
767 we shall assume it is a new peer and create an entry.
769 if(j == m_peers.end())
771 // Pass on to adding the peer
773 // Else: A peer was found.
776 Peer *peer = j->second;
779 derr_con<<"WARNING: Assuming unknown peer to be "
780 <<"peer_id="<<peer_id<<std::endl;
785 The peer was not found in our lists. Add it.
787 if(peer_id == PEER_ID_INEXISTENT)
789 // Somebody wants to make a new connection
791 // Get a unique peer id (2 or higher)
794 Find an unused peer id
796 bool out_of_ids = false;
800 if(m_peers.find(peer_id_new) == m_peers.end())
802 // Check for overflow
803 if(peer_id_new == 65535){
810 errorstream<<getDesc()<<" ran out of peer ids"<<std::endl;
815 dout_con<<"Receive(): Got a packet with peer_id=PEER_ID_INEXISTENT,"
816 " giving peer_id="<<peer_id_new<<std::endl;
819 Peer *peer = new Peer(peer_id_new, sender);
820 m_peers[peer->id] = peer;
822 // Create peer addition event
824 e.peerAdded(peer_id_new, sender);
827 // Create CONTROL packet to tell the peer id to the new peer.
828 SharedBuffer<u8> reply(4);
829 writeU8(&reply[0], TYPE_CONTROL);
830 writeU8(&reply[1], CONTROLTYPE_SET_PEER_ID);
831 writeU16(&reply[2], peer_id_new);
832 sendAsPacket(peer_id_new, 0, reply, true);
834 // We're now talking to a valid peer_id
835 peer_id = peer_id_new;
837 // Go on and process whatever it sent
840 std::map<u16, Peer*>::iterator node = m_peers.find(peer_id);
842 if(node == m_peers.end())
845 // This means that the peer id of the sender is not PEER_ID_INEXISTENT
846 // and it is invalid.
848 derr_con<<"Receive(): Peer not found"<<std::endl;
849 throw InvalidIncomingDataException("Peer not found (possible timeout)");
852 Peer *peer = node->second;
854 // Validate peer address
855 if(peer->address != sender)
858 derr_con<<"Peer "<<peer_id<<" sending from different address."
859 " Ignoring."<<std::endl;
863 peer->timeout_counter = 0.0;
865 Channel *channel = &(peer->channels[channelnum]);
867 // Throw the received packet to channel->processPacket()
869 // Make a new SharedBuffer from the data without the base headers
870 SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE);
871 memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE],
872 strippeddata.getSize());
875 // Process it (the result is some data with no headers made by us)
876 SharedBuffer<u8> resultdata = processPacket
877 (channel, strippeddata, peer_id, channelnum, false);
880 dout_con<<"ProcessPacket returned data of size "
881 <<resultdata.getSize()<<std::endl;
884 e.dataReceived(peer_id, resultdata);
887 }catch(ProcessedSilentlyException &e){
889 }catch(InvalidIncomingDataException &e){
891 catch(ProcessedSilentlyException &e){
896 void Connection::runTimeouts(float dtime)
898 float congestion_control_aim_rtt
899 = g_settings->getFloat("congestion_control_aim_rtt");
900 float congestion_control_max_rate
901 = g_settings->getFloat("congestion_control_max_rate");
902 float congestion_control_min_rate
903 = g_settings->getFloat("congestion_control_min_rate");
905 std::list<u16> timeouted_peers;
906 for(std::map<u16, Peer*>::iterator j = m_peers.begin();
907 j != m_peers.end(); ++j)
909 Peer *peer = j->second;
911 // Update congestion control values
912 peer->congestion_control_aim_rtt = congestion_control_aim_rtt;
913 peer->congestion_control_max_rate = congestion_control_max_rate;
914 peer->congestion_control_min_rate = congestion_control_min_rate;
919 peer->timeout_counter += dtime;
920 if(peer->timeout_counter > m_timeout)
923 derr_con<<"RunTimeouts(): Peer "<<peer->id
925 <<" (source=peer->timeout_counter)"
927 // Add peer to the list
928 timeouted_peers.push_back(peer->id);
929 // Don't bother going through the buffers of this one
933 float resend_timeout = peer->resend_timeout;
934 for(u16 i=0; i<CHANNEL_COUNT; i++)
936 std::list<BufferedPacket> timed_outs;
938 Channel *channel = &peer->channels[i];
940 // Remove timed out incomplete unreliable split packets
941 channel->incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout);
943 // Increment reliable packet times
944 channel->outgoing_reliables.incrementTimeouts(dtime);
946 // Check reliable packet total times, remove peer if
948 if(channel->outgoing_reliables.anyTotaltimeReached(m_timeout))
951 derr_con<<"RunTimeouts(): Peer "<<peer->id
953 <<" (source=reliable packet totaltime)"
955 // Add peer to the to-be-removed list
956 timeouted_peers.push_back(peer->id);
960 // Re-send timed out outgoing reliables
962 timed_outs = channel->
963 outgoing_reliables.getTimedOuts(resend_timeout);
965 channel->outgoing_reliables.resetTimedOuts(resend_timeout);
967 for(std::list<BufferedPacket>::iterator j = timed_outs.begin();
968 j != timed_outs.end(); ++j)
970 u16 peer_id = readPeerId(*(j->data));
971 u8 channel = readChannel(*(j->data));
972 u16 seqnum = readU16(&(j->data[BASE_HEADER_SIZE+1]));
975 derr_con<<"RE-SENDING timed-out RELIABLE to ";
976 j->address.print(&derr_con);
977 derr_con<<"(t/o="<<resend_timeout<<"): "
978 <<"from_peer_id="<<peer_id
979 <<", channel="<<((int)channel&0xff)
980 <<", seqnum="<<seqnum
985 // Enlarge avg_rtt and resend_timeout:
986 // The rtt will be at least the timeout.
987 // NOTE: This won't affect the timeout of the next
988 // checked channel because it was cached.
989 peer->reportRTT(resend_timeout);
996 peer->ping_timer += dtime;
997 if(peer->ping_timer >= 5.0)
999 // Create and send PING packet
1000 SharedBuffer<u8> data(2);
1001 writeU8(&data[0], TYPE_CONTROL);
1002 writeU8(&data[1], CONTROLTYPE_PING);
1003 rawSendAsPacket(peer->id, 0, data, true);
1005 peer->ping_timer = 0.0;
1012 // Remove timed out peers
1013 for(std::list<u16>::iterator i = timeouted_peers.begin();
1014 i != timeouted_peers.end(); ++i)
1016 PrintInfo(derr_con);
1017 derr_con<<"RunTimeouts(): Removing peer "<<(*i)<<std::endl;
1018 deletePeer(*i, true);
1022 void Connection::serve(u16 port)
1024 dout_con<<getDesc()<<" serving at port "<<port<<std::endl;
1026 m_socket.Bind(port);
1027 m_peer_id = PEER_ID_SERVER;
1029 catch(SocketException &e){
1037 void Connection::connect(Address address)
1039 dout_con<<getDesc()<<" connecting to "<<address.serializeString()
1040 <<":"<<address.getPort()<<std::endl;
1042 std::map<u16, Peer*>::iterator node = m_peers.find(PEER_ID_SERVER);
1043 if(node != m_peers.end()){
1044 throw ConnectionException("Already connected to a server");
1047 Peer *peer = new Peer(PEER_ID_SERVER, address);
1048 m_peers[peer->id] = peer;
1052 e.peerAdded(peer->id, peer->address);
1057 // Send a dummy packet to server with peer_id = PEER_ID_INEXISTENT
1058 m_peer_id = PEER_ID_INEXISTENT;
1059 SharedBuffer<u8> data(0);
1060 Send(PEER_ID_SERVER, 0, data, true);
1063 void Connection::disconnect()
1065 dout_con<<getDesc()<<" disconnecting"<<std::endl;
1067 // Create and send DISCO packet
1068 SharedBuffer<u8> data(2);
1069 writeU8(&data[0], TYPE_CONTROL);
1070 writeU8(&data[1], CONTROLTYPE_DISCO);
1073 for(std::map<u16, Peer*>::iterator j = m_peers.begin();
1074 j != m_peers.end(); ++j)
1076 Peer *peer = j->second;
1077 rawSendAsPacket(peer->id, 0, data, false);
1081 void Connection::sendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable)
1083 for(std::map<u16, Peer*>::iterator j = m_peers.begin();
1084 j != m_peers.end(); ++j)
1086 Peer *peer = j->second;
1087 send(peer->id, channelnum, data, reliable);
1091 void Connection::send(u16 peer_id, u8 channelnum,
1092 SharedBuffer<u8> data, bool reliable)
1094 dout_con<<getDesc()<<" sending to peer_id="<<peer_id<<std::endl;
1096 assert(channelnum < CHANNEL_COUNT);
1098 Peer *peer = getPeerNoEx(peer_id);
1101 Channel *channel = &(peer->channels[channelnum]);
1103 u32 chunksize_max = m_max_packet_size - BASE_HEADER_SIZE;
1105 chunksize_max -= RELIABLE_HEADER_SIZE;
1107 std::list<SharedBuffer<u8> > originals;
1108 originals = makeAutoSplitPacket(data, chunksize_max,
1109 channel->next_outgoing_split_seqnum);
1111 for(std::list<SharedBuffer<u8> >::iterator i = originals.begin();
1112 i != originals.end(); ++i)
1114 SharedBuffer<u8> original = *i;
1116 sendAsPacket(peer_id, channelnum, original, reliable);
1120 void Connection::sendAsPacket(u16 peer_id, u8 channelnum,
1121 SharedBuffer<u8> data, bool reliable)
1123 OutgoingPacket packet(peer_id, channelnum, data, reliable);
1124 m_outgoing_queue.push_back(packet);
1127 void Connection::rawSendAsPacket(u16 peer_id, u8 channelnum,
1128 SharedBuffer<u8> data, bool reliable)
1130 Peer *peer = getPeerNoEx(peer_id);
1133 Channel *channel = &(peer->channels[channelnum]);
1137 u16 seqnum = channel->next_outgoing_seqnum;
1138 channel->next_outgoing_seqnum++;
1140 SharedBuffer<u8> reliable = makeReliablePacket(data, seqnum);
1142 // Add base headers and make a packet
1143 BufferedPacket p = makePacket(peer->address, reliable,
1144 m_protocol_id, m_peer_id, channelnum);
1147 // Buffer the packet
1148 channel->outgoing_reliables.insert(p);
1150 catch(AlreadyExistsException &e)
1152 PrintInfo(derr_con);
1153 derr_con<<"WARNING: Going to send a reliable packet "
1154 "seqnum="<<seqnum<<" that is already "
1155 "in outgoing buffer"<<std::endl;
1164 // Add base headers and make a packet
1165 BufferedPacket p = makePacket(peer->address, data,
1166 m_protocol_id, m_peer_id, channelnum);
1173 void Connection::rawSend(const BufferedPacket &packet)
1176 m_socket.Send(packet.address, *packet.data, packet.data.getSize());
1177 } catch(SendFailedException &e){
1178 derr_con<<"Connection::rawSend(): SendFailedException: "
1179 <<packet.address.serializeString()<<std::endl;
1183 Peer* Connection::getPeer(u16 peer_id)
1185 std::map<u16, Peer*>::iterator node = m_peers.find(peer_id);
1187 if(node == m_peers.end()){
1188 throw PeerNotFoundException("GetPeer: Peer not found (possible timeout)");
1192 assert(node->second->id == peer_id);
1194 return node->second;
1197 Peer* Connection::getPeerNoEx(u16 peer_id)
1199 std::map<u16, Peer*>::iterator node = m_peers.find(peer_id);
1201 if(node == m_peers.end()){
1206 assert(node->second->id == peer_id);
1208 return node->second;
1211 std::list<Peer*> Connection::getPeers()
1213 std::list<Peer*> list;
1214 for(std::map<u16, Peer*>::iterator j = m_peers.begin();
1215 j != m_peers.end(); ++j)
1217 Peer *peer = j->second;
1218 list.push_back(peer);
1223 bool Connection::getFromBuffers(u16 &peer_id, SharedBuffer<u8> &dst)
1225 for(std::map<u16, Peer*>::iterator j = m_peers.begin();
1226 j != m_peers.end(); ++j)
1228 Peer *peer = j->second;
1229 for(u16 i=0; i<CHANNEL_COUNT; i++)
1231 Channel *channel = &peer->channels[i];
1232 SharedBuffer<u8> resultdata;
1233 bool got = checkIncomingBuffers(channel, peer_id, resultdata);
1243 bool Connection::checkIncomingBuffers(Channel *channel, u16 &peer_id,
1244 SharedBuffer<u8> &dst)
1246 u16 firstseqnum = 0;
1247 // Clear old packets from start of buffer
1250 firstseqnum = channel->incoming_reliables.getFirstSeqnum();
1251 if(seqnum_higher(channel->next_incoming_seqnum, firstseqnum))
1252 channel->incoming_reliables.popFirst();
1256 // This happens if all packets are old
1257 }catch(con::NotFoundException)
1260 if(channel->incoming_reliables.empty() == false)
1262 if(firstseqnum == channel->next_incoming_seqnum)
1264 BufferedPacket p = channel->incoming_reliables.popFirst();
1266 peer_id = readPeerId(*p.data);
1267 u8 channelnum = readChannel(*p.data);
1268 u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
1271 dout_con<<"UNBUFFERING TYPE_RELIABLE"
1272 <<" seqnum="<<seqnum
1273 <<" peer_id="<<peer_id
1274 <<" channel="<<((int)channelnum&0xff)
1277 channel->next_incoming_seqnum++;
1279 u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
1280 // Get out the inside packet and re-process it
1281 SharedBuffer<u8> payload(p.data.getSize() - headers_size);
1282 memcpy(*payload, &p.data[headers_size], payload.getSize());
1284 dst = processPacket(channel, payload, peer_id, channelnum, true);
1291 SharedBuffer<u8> Connection::processPacket(Channel *channel,
1292 SharedBuffer<u8> packetdata, u16 peer_id,
1293 u8 channelnum, bool reliable)
1295 IndentationRaiser iraiser(&(m_indentation));
1297 if(packetdata.getSize() < 1)
1298 throw InvalidIncomingDataException("packetdata.getSize() < 1");
1300 u8 type = readU8(&packetdata[0]);
1302 if(type == TYPE_CONTROL)
1304 if(packetdata.getSize() < 2)
1305 throw InvalidIncomingDataException("packetdata.getSize() < 2");
1307 u8 controltype = readU8(&packetdata[1]);
1309 if(controltype == CONTROLTYPE_ACK)
1311 if(packetdata.getSize() < 4)
1312 throw InvalidIncomingDataException
1313 ("packetdata.getSize() < 4 (ACK header size)");
1315 u16 seqnum = readU16(&packetdata[2]);
1317 dout_con<<"Got CONTROLTYPE_ACK: channelnum="
1318 <<((int)channelnum&0xff)<<", peer_id="<<peer_id
1319 <<", seqnum="<<seqnum<<std::endl;
1322 BufferedPacket p = channel->outgoing_reliables.popSeqnum(seqnum);
1323 // Get round trip time
1324 float rtt = p.totaltime;
1326 // Let peer calculate stuff according to it
1327 // (avg_rtt and resend_timeout)
1328 Peer *peer = getPeer(peer_id);
1329 peer->reportRTT(rtt);
1331 //PrintInfo(dout_con);
1332 //dout_con<<"RTT = "<<rtt<<std::endl;
1334 /*dout_con<<"OUTGOING: ";
1336 channel->outgoing_reliables.print();
1337 dout_con<<std::endl;*/
1339 catch(NotFoundException &e){
1340 PrintInfo(derr_con);
1341 derr_con<<"WARNING: ACKed packet not "
1346 throw ProcessedSilentlyException("Got an ACK");
1348 else if(controltype == CONTROLTYPE_SET_PEER_ID)
1350 if(packetdata.getSize() < 4)
1351 throw InvalidIncomingDataException
1352 ("packetdata.getSize() < 4 (SET_PEER_ID header size)");
1353 u16 peer_id_new = readU16(&packetdata[2]);
1355 dout_con<<"Got new peer id: "<<peer_id_new<<"... "<<std::endl;
1357 if(GetPeerID() != PEER_ID_INEXISTENT)
1359 PrintInfo(derr_con);
1360 derr_con<<"WARNING: Not changing"
1361 " existing peer id."<<std::endl;
1365 dout_con<<"changing."<<std::endl;
1366 SetPeerID(peer_id_new);
1368 throw ProcessedSilentlyException("Got a SET_PEER_ID");
1370 else if(controltype == CONTROLTYPE_PING)
1372 // Just ignore it, the incoming data already reset
1373 // the timeout counter
1375 dout_con<<"PING"<<std::endl;
1376 throw ProcessedSilentlyException("Got a PING");
1378 else if(controltype == CONTROLTYPE_DISCO)
1380 // Just ignore it, the incoming data already reset
1381 // the timeout counter
1383 dout_con<<"DISCO: Removing peer "<<(peer_id)<<std::endl;
1385 if(deletePeer(peer_id, false) == false)
1387 PrintInfo(derr_con);
1388 derr_con<<"DISCO: Peer not found"<<std::endl;
1391 throw ProcessedSilentlyException("Got a DISCO");
1394 PrintInfo(derr_con);
1395 derr_con<<"INVALID TYPE_CONTROL: invalid controltype="
1396 <<((int)controltype&0xff)<<std::endl;
1397 throw InvalidIncomingDataException("Invalid control type");
1400 else if(type == TYPE_ORIGINAL)
1402 if(packetdata.getSize() < ORIGINAL_HEADER_SIZE)
1403 throw InvalidIncomingDataException
1404 ("packetdata.getSize() < ORIGINAL_HEADER_SIZE");
1406 dout_con<<"RETURNING TYPE_ORIGINAL to user"
1408 // Get the inside packet out and return it
1409 SharedBuffer<u8> payload(packetdata.getSize() - ORIGINAL_HEADER_SIZE);
1410 memcpy(*payload, &packetdata[ORIGINAL_HEADER_SIZE], payload.getSize());
1413 else if(type == TYPE_SPLIT)
1415 // We have to create a packet again for buffering
1416 // This isn't actually too bad an idea.
1417 BufferedPacket packet = makePacket(
1418 getPeer(peer_id)->address,
1423 // Buffer the packet
1424 SharedBuffer<u8> data = channel->incoming_splits.insert(packet, reliable);
1425 if(data.getSize() != 0)
1428 dout_con<<"RETURNING TYPE_SPLIT: Constructed full data, "
1429 <<"size="<<data.getSize()<<std::endl;
1433 dout_con<<"BUFFERED TYPE_SPLIT"<<std::endl;
1434 throw ProcessedSilentlyException("Buffered a split packet chunk");
1436 else if(type == TYPE_RELIABLE)
1438 // Recursive reliable packets not allowed
1440 throw InvalidIncomingDataException("Found nested reliable packets");
1442 if(packetdata.getSize() < RELIABLE_HEADER_SIZE)
1443 throw InvalidIncomingDataException
1444 ("packetdata.getSize() < RELIABLE_HEADER_SIZE");
1446 u16 seqnum = readU16(&packetdata[1]);
1448 bool is_future_packet = seqnum_higher(seqnum, channel->next_incoming_seqnum);
1449 bool is_old_packet = seqnum_higher(channel->next_incoming_seqnum, seqnum);
1452 if(is_future_packet)
1453 dout_con<<"BUFFERING";
1454 else if(is_old_packet)
1458 dout_con<<" TYPE_RELIABLE seqnum="<<seqnum
1459 <<" next="<<channel->next_incoming_seqnum;
1460 dout_con<<" [sending CONTROLTYPE_ACK"
1461 " to peer_id="<<peer_id<<"]";
1462 dout_con<<std::endl;
1465 //assert(channel->incoming_reliables.size() < 100);
1467 // Send a CONTROLTYPE_ACK
1468 SharedBuffer<u8> reply(4);
1469 writeU8(&reply[0], TYPE_CONTROL);
1470 writeU8(&reply[1], CONTROLTYPE_ACK);
1471 writeU16(&reply[2], seqnum);
1472 rawSendAsPacket(peer_id, channelnum, reply, false);
1474 //if(seqnum_higher(seqnum, channel->next_incoming_seqnum))
1475 if(is_future_packet)
1478 dout_con<<"Buffering reliable packet (seqnum="
1479 <<seqnum<<")"<<std::endl;*/
1481 // This one comes later, buffer it.
1482 // Actually we have to make a packet to buffer one.
1483 // Well, we have all the ingredients, so just do it.
1484 BufferedPacket packet = makePacket(
1485 getPeer(peer_id)->address,
1491 channel->incoming_reliables.insert(packet);
1494 dout_con<<"INCOMING: ";
1495 channel->incoming_reliables.print();
1496 dout_con<<std::endl;*/
1498 catch(AlreadyExistsException &e)
1502 throw ProcessedSilentlyException("Buffered future reliable packet");
1504 //else if(seqnum_higher(channel->next_incoming_seqnum, seqnum))
1505 else if(is_old_packet)
1507 // An old packet, dump it
1508 throw InvalidIncomingDataException("Got an old reliable packet");
1511 channel->next_incoming_seqnum++;
1513 // Get out the inside packet and re-process it
1514 SharedBuffer<u8> payload(packetdata.getSize() - RELIABLE_HEADER_SIZE);
1515 memcpy(*payload, &packetdata[RELIABLE_HEADER_SIZE], payload.getSize());
1517 return processPacket(channel, payload, peer_id, channelnum, true);
1521 PrintInfo(derr_con);
1522 derr_con<<"Got invalid type="<<((int)type&0xff)<<std::endl;
1523 throw InvalidIncomingDataException("Invalid packet type");
1526 // We should never get here.
1527 // If you get here, add an exception or a return to some of the
1528 // above conditionals.
1530 throw BaseException("Error in Channel::ProcessPacket()");
1533 bool Connection::deletePeer(u16 peer_id, bool timeout)
1535 if(m_peers.find(peer_id) == m_peers.end())
1538 Peer *peer = m_peers[peer_id];
1542 e.peerRemoved(peer_id, timeout, peer->address);
1545 delete m_peers[peer_id];
1546 m_peers.erase(peer_id);
1552 ConnectionEvent Connection::getEvent()
1554 if(m_event_queue.empty()){
1556 e.type = CONNEVENT_NONE;
1559 return m_event_queue.pop_front();
1562 ConnectionEvent Connection::waitEvent(u32 timeout_ms)
1565 return m_event_queue.pop_front(timeout_ms);
1566 } catch(ItemNotFoundException &ex){
1568 e.type = CONNEVENT_NONE;
1573 void Connection::putCommand(ConnectionCommand &c)
1575 m_command_queue.push_back(c);
1578 void Connection::Serve(unsigned short port)
1580 ConnectionCommand c;
1585 void Connection::Connect(Address address)
1587 ConnectionCommand c;
1592 bool Connection::Connected()
1594 JMutexAutoLock peerlock(m_peers_mutex);
1596 if(m_peers.size() != 1)
1599 std::map<u16, Peer*>::iterator node = m_peers.find(PEER_ID_SERVER);
1600 if(node == m_peers.end())
1603 if(m_peer_id == PEER_ID_INEXISTENT)
1609 void Connection::Disconnect()
1611 ConnectionCommand c;
1616 u32 Connection::Receive(u16 &peer_id, SharedBuffer<u8> &data)
1619 ConnectionEvent e = waitEvent(m_bc_receive_timeout);
1620 if(e.type != CONNEVENT_NONE)
1621 dout_con<<getDesc()<<": Receive: got event: "
1622 <<e.describe()<<std::endl;
1624 case CONNEVENT_NONE:
1625 throw NoIncomingDataException("No incoming data");
1626 case CONNEVENT_DATA_RECEIVED:
1627 peer_id = e.peer_id;
1628 data = SharedBuffer<u8>(e.data);
1629 return e.data.getSize();
1630 case CONNEVENT_PEER_ADDED: {
1631 Peer tmp(e.peer_id, e.address);
1632 if(m_bc_peerhandler)
1633 m_bc_peerhandler->peerAdded(&tmp);
1635 case CONNEVENT_PEER_REMOVED: {
1636 Peer tmp(e.peer_id, e.address);
1637 if(m_bc_peerhandler)
1638 m_bc_peerhandler->deletingPeer(&tmp, e.timeout);
1640 case CONNEVENT_BIND_FAILED:
1641 throw ConnectionBindFailed("Failed to bind socket "
1642 "(port already in use?)");
1645 throw NoIncomingDataException("No incoming data");
1648 void Connection::SendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable)
1650 assert(channelnum < CHANNEL_COUNT);
1652 ConnectionCommand c;
1653 c.sendToAll(channelnum, data, reliable);
1657 void Connection::Send(u16 peer_id, u8 channelnum,
1658 SharedBuffer<u8> data, bool reliable)
1660 assert(channelnum < CHANNEL_COUNT);
1662 ConnectionCommand c;
1663 c.send(peer_id, channelnum, data, reliable);
1667 void Connection::RunTimeouts(float dtime)
1672 Address Connection::GetPeerAddress(u16 peer_id)
1674 JMutexAutoLock peerlock(m_peers_mutex);
1675 return getPeer(peer_id)->address;
1678 float Connection::GetPeerAvgRTT(u16 peer_id)
1680 JMutexAutoLock peerlock(m_peers_mutex);
1681 return getPeer(peer_id)->avg_rtt;
1684 void Connection::DeletePeer(u16 peer_id)
1686 ConnectionCommand c;
1687 c.deletePeer(peer_id);
1691 void Connection::PrintInfo(std::ostream &out)
1693 out<<getDesc()<<": ";
1696 void Connection::PrintInfo()
1698 PrintInfo(dout_con);
1701 std::string Connection::getDesc()
1703 return std::string("con(")+itos(m_socket.GetHandle())+"/"+itos(m_peer_id)+")";