3 Copyright (C) 2013 celeron55, Perttu Ahola <celeron55@gmail.com>
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU Lesser General Public License as published by
7 the Free Software Foundation; either version 2.1 of the License, or
8 (at your option) any later version.
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU Lesser General Public License for more details.
15 You should have received a copy of the GNU Lesser General Public License along
16 with this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 #include "connection.h"
22 #include "serialization.h"
25 #include "util/serialize.h"
26 #include "util/numeric.h"
27 #include "util/string.h"
33 static u16 readPeerId(u8 *packetdata)
35 return readU16(&packetdata[4]);
37 static u8 readChannel(u8 *packetdata)
39 return readU8(&packetdata[6]);
42 BufferedPacket makePacket(Address &address, u8 *data, u32 datasize,
43 u32 protocol_id, u16 sender_peer_id, u8 channel)
45 u32 packet_size = datasize + BASE_HEADER_SIZE;
46 BufferedPacket p(packet_size);
49 writeU32(&p.data[0], protocol_id);
50 writeU16(&p.data[4], sender_peer_id);
51 writeU8(&p.data[6], channel);
53 memcpy(&p.data[BASE_HEADER_SIZE], data, datasize);
58 BufferedPacket makePacket(Address &address, SharedBuffer<u8> &data,
59 u32 protocol_id, u16 sender_peer_id, u8 channel)
61 return makePacket(address, *data, data.getSize(),
62 protocol_id, sender_peer_id, channel);
65 SharedBuffer<u8> makeOriginalPacket(
66 SharedBuffer<u8> data)
69 u32 packet_size = data.getSize() + header_size;
70 SharedBuffer<u8> b(packet_size);
72 writeU8(&b[0], TYPE_ORIGINAL);
74 memcpy(&b[header_size], *data, data.getSize());
79 std::list<SharedBuffer<u8> > makeSplitPacket(
80 SharedBuffer<u8> data,
84 // Chunk packets, containing the TYPE_SPLIT header
85 std::list<SharedBuffer<u8> > chunks;
87 u32 chunk_header_size = 7;
88 u32 maximum_data_size = chunksize_max - chunk_header_size;
94 end = start + maximum_data_size - 1;
95 if(end > data.getSize() - 1)
96 end = data.getSize() - 1;
98 u32 payload_size = end - start + 1;
99 u32 packet_size = chunk_header_size + payload_size;
101 SharedBuffer<u8> chunk(packet_size);
103 writeU8(&chunk[0], TYPE_SPLIT);
104 writeU16(&chunk[1], seqnum);
105 // [3] u16 chunk_count is written at next stage
106 writeU16(&chunk[5], chunk_num);
107 memcpy(&chunk[chunk_header_size], &data[start], payload_size);
109 chunks.push_back(chunk);
115 while(end != data.getSize() - 1);
117 for(std::list<SharedBuffer<u8> >::iterator i = chunks.begin();
118 i != chunks.end(); ++i)
121 writeU16(&((*i)[3]), chunk_count);
127 std::list<SharedBuffer<u8> > makeAutoSplitPacket(
128 SharedBuffer<u8> data,
132 u32 original_header_size = 1;
133 std::list<SharedBuffer<u8> > list;
134 if(data.getSize() + original_header_size > chunksize_max)
136 list = makeSplitPacket(data, chunksize_max, split_seqnum);
142 list.push_back(makeOriginalPacket(data));
147 SharedBuffer<u8> makeReliablePacket(
148 SharedBuffer<u8> data,
151 /*dstream<<"BEGIN SharedBuffer<u8> makeReliablePacket()"<<std::endl;
152 dstream<<"data.getSize()="<<data.getSize()<<", data[0]="
153 <<((unsigned int)data[0]&0xff)<<std::endl;*/
155 u32 packet_size = data.getSize() + header_size;
156 SharedBuffer<u8> b(packet_size);
158 writeU8(&b[0], TYPE_RELIABLE);
159 writeU16(&b[1], seqnum);
161 memcpy(&b[header_size], *data, data.getSize());
163 /*dstream<<"data.getSize()="<<data.getSize()<<", data[0]="
164 <<((unsigned int)data[0]&0xff)<<std::endl;*/
165 //dstream<<"END SharedBuffer<u8> makeReliablePacket()"<<std::endl;
173 ReliablePacketBuffer::ReliablePacketBuffer(): m_list_size(0) {}
175 void ReliablePacketBuffer::print()
177 for(std::list<BufferedPacket>::iterator i = m_list.begin();
181 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
185 bool ReliablePacketBuffer::empty()
187 return m_list.empty();
189 u32 ReliablePacketBuffer::size()
193 RPBSearchResult ReliablePacketBuffer::findPacket(u16 seqnum)
195 std::list<BufferedPacket>::iterator i = m_list.begin();
196 for(; i != m_list.end(); ++i)
198 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
199 /*dout_con<<"findPacket(): finding seqnum="<<seqnum
200 <<", comparing to s="<<s<<std::endl;*/
206 RPBSearchResult ReliablePacketBuffer::notFound()
210 bool ReliablePacketBuffer::getFirstSeqnum(u16 *result)
214 BufferedPacket p = *m_list.begin();
215 *result = readU16(&p.data[BASE_HEADER_SIZE+1]);
218 BufferedPacket ReliablePacketBuffer::popFirst()
221 throw NotFoundException("Buffer is empty");
222 BufferedPacket p = *m_list.begin();
223 m_list.erase(m_list.begin());
227 BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum)
229 RPBSearchResult r = findPacket(seqnum);
231 dout_con<<"Not found"<<std::endl;
232 throw NotFoundException("seqnum not found in buffer");
234 BufferedPacket p = *r;
239 void ReliablePacketBuffer::insert(BufferedPacket &p)
241 assert(p.data.getSize() >= BASE_HEADER_SIZE+3);
242 u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
243 assert(type == TYPE_RELIABLE);
244 u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
247 // Find the right place for the packet and insert it there
249 // If list is empty, just add it
256 // Otherwise find the right place
257 std::list<BufferedPacket>::iterator i = m_list.begin();
258 // Find the first packet in the list which has a higher seqnum
259 for(; i != m_list.end(); ++i){
260 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
263 throw AlreadyExistsException("Same seqnum in list");
265 if(seqnum_higher(s, seqnum)){
269 // If we're at the end of the list, add the packet to the
271 if(i == m_list.end())
281 void ReliablePacketBuffer::incrementTimeouts(float dtime)
283 for(std::list<BufferedPacket>::iterator i = m_list.begin();
284 i != m_list.end(); ++i)
287 i->totaltime += dtime;
291 void ReliablePacketBuffer::resetTimedOuts(float timeout)
293 for(std::list<BufferedPacket>::iterator i = m_list.begin();
294 i != m_list.end(); ++i)
296 if(i->time >= timeout)
301 bool ReliablePacketBuffer::anyTotaltimeReached(float timeout)
303 for(std::list<BufferedPacket>::iterator i = m_list.begin();
304 i != m_list.end(); ++i)
306 if(i->totaltime >= timeout)
312 std::list<BufferedPacket> ReliablePacketBuffer::getTimedOuts(float timeout)
314 std::list<BufferedPacket> timed_outs;
315 for(std::list<BufferedPacket>::iterator i = m_list.begin();
316 i != m_list.end(); ++i)
318 if(i->time >= timeout)
319 timed_outs.push_back(*i);
328 IncomingSplitBuffer::~IncomingSplitBuffer()
330 for(std::map<u16, IncomingSplitPacket*>::iterator i = m_buf.begin();
331 i != m_buf.end(); ++i)
337 This will throw a GotSplitPacketException when a full
338 split packet is constructed.
340 SharedBuffer<u8> IncomingSplitBuffer::insert(BufferedPacket &p, bool reliable)
342 u32 headersize = BASE_HEADER_SIZE + 7;
343 assert(p.data.getSize() >= headersize);
344 u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
345 assert(type == TYPE_SPLIT);
346 u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
347 u16 chunk_count = readU16(&p.data[BASE_HEADER_SIZE+3]);
348 u16 chunk_num = readU16(&p.data[BASE_HEADER_SIZE+5]);
350 // Add if doesn't exist
351 if(m_buf.find(seqnum) == m_buf.end())
353 IncomingSplitPacket *sp = new IncomingSplitPacket();
354 sp->chunk_count = chunk_count;
355 sp->reliable = reliable;
359 IncomingSplitPacket *sp = m_buf[seqnum];
361 // TODO: These errors should be thrown or something? Dunno.
362 if(chunk_count != sp->chunk_count)
363 derr_con<<"Connection: WARNING: chunk_count="<<chunk_count
364 <<" != sp->chunk_count="<<sp->chunk_count
366 if(reliable != sp->reliable)
367 derr_con<<"Connection: WARNING: reliable="<<reliable
368 <<" != sp->reliable="<<sp->reliable
371 // If chunk already exists, ignore it.
372 // Sometimes two identical packets may arrive when there is network
373 // lag and the server re-sends stuff.
374 if(sp->chunks.find(chunk_num) != sp->chunks.end())
375 return SharedBuffer<u8>();
377 // Cut chunk data out of packet
378 u32 chunkdatasize = p.data.getSize() - headersize;
379 SharedBuffer<u8> chunkdata(chunkdatasize);
380 memcpy(*chunkdata, &(p.data[headersize]), chunkdatasize);
382 // Set chunk data in buffer
383 sp->chunks[chunk_num] = chunkdata;
385 // If not all chunks are received, return empty buffer
386 if(sp->allReceived() == false)
387 return SharedBuffer<u8>();
389 // Calculate total size
391 for(std::map<u16, SharedBuffer<u8> >::iterator i = sp->chunks.begin();
392 i != sp->chunks.end(); ++i)
394 totalsize += i->second.getSize();
397 SharedBuffer<u8> fulldata(totalsize);
399 // Copy chunks to data buffer
401 for(u32 chunk_i=0; chunk_i<sp->chunk_count;
404 SharedBuffer<u8> buf = sp->chunks[chunk_i];
405 u16 chunkdatasize = buf.getSize();
406 memcpy(&fulldata[start], *buf, chunkdatasize);
407 start += chunkdatasize;;
410 // Remove sp from buffer
416 void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout)
418 std::list<u16> remove_queue;
419 for(std::map<u16, IncomingSplitPacket*>::iterator i = m_buf.begin();
420 i != m_buf.end(); ++i)
422 IncomingSplitPacket *p = i->second;
423 // Reliable ones are not removed by timeout
424 if(p->reliable == true)
427 if(p->time >= timeout)
428 remove_queue.push_back(i->first);
430 for(std::list<u16>::iterator j = remove_queue.begin();
431 j != remove_queue.end(); ++j)
433 dout_con<<"NOTE: Removing timed out unreliable split packet"
446 next_outgoing_seqnum = SEQNUM_INITIAL;
447 next_incoming_seqnum = SEQNUM_INITIAL;
448 next_outgoing_split_seqnum = SEQNUM_INITIAL;
458 Peer::Peer(u16 a_id, Address a_address):
461 timeout_counter(0.0),
465 has_sent_with_id(false),
467 m_max_packets_per_second(10),
470 congestion_control_aim_rtt(0.2),
471 congestion_control_max_rate(400),
472 congestion_control_min_rate(10)
479 void Peer::reportRTT(float rtt)
483 if(m_max_packets_per_second < congestion_control_max_rate)
484 m_max_packets_per_second += 10;
485 } else if(rtt < congestion_control_aim_rtt){
486 if(m_max_packets_per_second < congestion_control_max_rate)
487 m_max_packets_per_second += 2;
489 m_max_packets_per_second *= 0.8;
490 if(m_max_packets_per_second < congestion_control_min_rate)
491 m_max_packets_per_second = congestion_control_min_rate;
497 else if(avg_rtt < 0.0)
500 avg_rtt = rtt * 0.1 + avg_rtt * 0.9;
502 // Calculate resend_timeout
504 /*int reliable_count = 0;
505 for(int i=0; i<CHANNEL_COUNT; i++)
507 reliable_count += channels[i].outgoing_reliables.size();
509 float timeout = avg_rtt * RESEND_TIMEOUT_FACTOR
510 * ((float)reliable_count * 1);*/
512 float timeout = avg_rtt * RESEND_TIMEOUT_FACTOR;
513 if(timeout < RESEND_TIMEOUT_MIN)
514 timeout = RESEND_TIMEOUT_MIN;
515 if(timeout > RESEND_TIMEOUT_MAX)
516 timeout = RESEND_TIMEOUT_MAX;
517 resend_timeout = timeout;
524 Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout,
526 m_protocol_id(protocol_id),
527 m_max_packet_size(max_packet_size),
531 m_bc_peerhandler(NULL),
532 m_bc_receive_timeout(0),
535 m_socket.setTimeoutMs(5);
540 Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout,
541 bool ipv6, PeerHandler *peerhandler):
542 m_protocol_id(protocol_id),
543 m_max_packet_size(max_packet_size),
547 m_bc_peerhandler(peerhandler),
548 m_bc_receive_timeout(0),
551 m_socket.setTimeoutMs(5);
557 Connection::~Connection()
561 for(std::map<u16, Peer*>::iterator
563 j != m_peers.end(); ++j)
571 void * Connection::Thread()
574 log_register_thread("Connection");
576 dout_con<<"Connection thread started"<<std::endl;
578 u32 curtime = porting::getTimeMs();
579 u32 lasttime = curtime;
583 BEGIN_DEBUG_EXCEPTION_HANDLER
586 curtime = porting::getTimeMs();
587 float dtime = (float)(curtime - lasttime) / 1000.;
595 while(!m_command_queue.empty()){
596 ConnectionCommand c = m_command_queue.pop_front();
604 END_DEBUG_EXCEPTION_HANDLER(derr_con);
610 void Connection::putEvent(ConnectionEvent &e)
612 assert(e.type != CONNEVENT_NONE);
613 m_event_queue.push_back(e);
616 void Connection::processCommand(ConnectionCommand &c)
620 dout_con<<getDesc()<<" processing CONNCMD_NONE"<<std::endl;
623 dout_con<<getDesc()<<" processing CONNCMD_SERVE port="
627 case CONNCMD_CONNECT:
628 dout_con<<getDesc()<<" processing CONNCMD_CONNECT"<<std::endl;
631 case CONNCMD_DISCONNECT:
632 dout_con<<getDesc()<<" processing CONNCMD_DISCONNECT"<<std::endl;
636 dout_con<<getDesc()<<" processing CONNCMD_SEND"<<std::endl;
637 send(c.peer_id, c.channelnum, c.data, c.reliable);
639 case CONNCMD_SEND_TO_ALL:
640 dout_con<<getDesc()<<" processing CONNCMD_SEND_TO_ALL"<<std::endl;
641 sendToAll(c.channelnum, c.data, c.reliable);
643 case CONNCMD_DELETE_PEER:
644 dout_con<<getDesc()<<" processing CONNCMD_DELETE_PEER"<<std::endl;
645 deletePeer(c.peer_id, false);
650 void Connection::send(float dtime)
652 for(std::map<u16, Peer*>::iterator
654 j != m_peers.end(); ++j)
656 Peer *peer = j->second;
657 peer->m_sendtime_accu += dtime;
658 peer->m_num_sent = 0;
659 peer->m_max_num_sent = peer->m_sendtime_accu *
660 peer->m_max_packets_per_second;
662 Queue<OutgoingPacket> postponed_packets;
663 while(!m_outgoing_queue.empty()){
664 OutgoingPacket packet = m_outgoing_queue.pop_front();
665 Peer *peer = getPeerNoEx(packet.peer_id);
668 if(peer->channels[packet.channelnum].outgoing_reliables.size() >= 5){
669 postponed_packets.push_back(packet);
670 } else if(peer->m_num_sent < peer->m_max_num_sent){
671 rawSendAsPacket(packet.peer_id, packet.channelnum,
672 packet.data, packet.reliable);
675 postponed_packets.push_back(packet);
678 while(!postponed_packets.empty()){
679 m_outgoing_queue.push_back(postponed_packets.pop_front());
681 for(std::map<u16, Peer*>::iterator
683 j != m_peers.end(); ++j)
685 Peer *peer = j->second;
686 peer->m_sendtime_accu -= (float)peer->m_num_sent /
687 peer->m_max_packets_per_second;
688 if(peer->m_sendtime_accu > 10. / peer->m_max_packets_per_second)
689 peer->m_sendtime_accu = 10. / peer->m_max_packets_per_second;
693 // Receive packets from the network and buffers and create ConnectionEvents
694 void Connection::receive()
696 u32 datasize = m_max_packet_size * 2; // Double it just to be safe
697 // TODO: We can not know how many layers of header there are.
698 // For now, just assume there are no other than the base headers.
699 u32 packet_maxsize = datasize + BASE_HEADER_SIZE;
700 SharedBuffer<u8> packetdata(packet_maxsize);
702 bool single_wait_done = false;
704 for(u32 loop_i=0; loop_i<1000; loop_i++) // Limit in case of DoS
707 /* Check if some buffer has relevant data */
710 SharedBuffer<u8> resultdata;
711 bool got = getFromBuffers(peer_id, resultdata);
714 e.dataReceived(peer_id, resultdata);
720 if(single_wait_done){
721 if(m_socket.WaitData(0) == false)
725 single_wait_done = true;
728 s32 received_size = m_socket.Receive(sender, *packetdata, packet_maxsize);
730 if(received_size < 0)
732 if(received_size < BASE_HEADER_SIZE)
734 if(readU32(&packetdata[0]) != m_protocol_id)
737 u16 peer_id = readPeerId(*packetdata);
738 u8 channelnum = readChannel(*packetdata);
739 if(channelnum > CHANNEL_COUNT-1){
741 derr_con<<"Receive(): Invalid channel "<<channelnum<<std::endl;
742 throw InvalidIncomingDataException("Channel doesn't exist");
745 if(peer_id == PEER_ID_INEXISTENT)
748 Somebody is trying to send stuff to us with no peer id.
750 Check if the same address and port was added to our peer
752 Allow only entries that have has_sent_with_id==false.
755 std::map<u16, Peer*>::iterator j;
757 for(; j != m_peers.end(); ++j)
759 Peer *peer = j->second;
760 if(peer->has_sent_with_id)
762 if(peer->address == sender)
767 If no peer was found with the same address and port,
768 we shall assume it is a new peer and create an entry.
770 if(j == m_peers.end())
772 // Pass on to adding the peer
774 // Else: A peer was found.
777 Peer *peer = j->second;
780 derr_con<<"WARNING: Assuming unknown peer to be "
781 <<"peer_id="<<peer_id<<std::endl;
786 The peer was not found in our lists. Add it.
788 if(peer_id == PEER_ID_INEXISTENT)
790 // Somebody wants to make a new connection
792 // Get a unique peer id (2 or higher)
795 Find an unused peer id
797 bool out_of_ids = false;
801 if(m_peers.find(peer_id_new) == m_peers.end())
803 // Check for overflow
804 if(peer_id_new == 65535){
811 errorstream<<getDesc()<<" ran out of peer ids"<<std::endl;
816 dout_con<<"Receive(): Got a packet with peer_id=PEER_ID_INEXISTENT,"
817 " giving peer_id="<<peer_id_new<<std::endl;
820 Peer *peer = new Peer(peer_id_new, sender);
821 m_peers[peer->id] = peer;
823 // Create peer addition event
825 e.peerAdded(peer_id_new, sender);
828 // Create CONTROL packet to tell the peer id to the new peer.
829 SharedBuffer<u8> reply(4);
830 writeU8(&reply[0], TYPE_CONTROL);
831 writeU8(&reply[1], CONTROLTYPE_SET_PEER_ID);
832 writeU16(&reply[2], peer_id_new);
833 sendAsPacket(peer_id_new, 0, reply, true);
835 // We're now talking to a valid peer_id
836 peer_id = peer_id_new;
838 // Go on and process whatever it sent
841 std::map<u16, Peer*>::iterator node = m_peers.find(peer_id);
843 if(node == m_peers.end())
846 // This means that the peer id of the sender is not PEER_ID_INEXISTENT
847 // and it is invalid.
849 derr_con<<"Receive(): Peer not found"<<std::endl;
850 throw InvalidIncomingDataException("Peer not found (possible timeout)");
853 Peer *peer = node->second;
855 // Validate peer address
856 if(peer->address != sender)
859 derr_con<<"Peer "<<peer_id<<" sending from different address."
860 " Ignoring."<<std::endl;
864 peer->timeout_counter = 0.0;
866 Channel *channel = &(peer->channels[channelnum]);
868 // Throw the received packet to channel->processPacket()
870 // Make a new SharedBuffer from the data without the base headers
871 SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE);
872 memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE],
873 strippeddata.getSize());
876 // Process it (the result is some data with no headers made by us)
877 SharedBuffer<u8> resultdata = processPacket
878 (channel, strippeddata, peer_id, channelnum, false);
881 dout_con<<"ProcessPacket returned data of size "
882 <<resultdata.getSize()<<std::endl;
885 e.dataReceived(peer_id, resultdata);
888 }catch(ProcessedSilentlyException &e){
890 }catch(InvalidIncomingDataException &e){
892 catch(ProcessedSilentlyException &e){
897 void Connection::runTimeouts(float dtime)
899 float congestion_control_aim_rtt
900 = g_settings->getFloat("congestion_control_aim_rtt");
901 float congestion_control_max_rate
902 = g_settings->getFloat("congestion_control_max_rate");
903 float congestion_control_min_rate
904 = g_settings->getFloat("congestion_control_min_rate");
906 std::list<u16> timeouted_peers;
907 for(std::map<u16, Peer*>::iterator j = m_peers.begin();
908 j != m_peers.end(); ++j)
910 Peer *peer = j->second;
912 // Update congestion control values
913 peer->congestion_control_aim_rtt = congestion_control_aim_rtt;
914 peer->congestion_control_max_rate = congestion_control_max_rate;
915 peer->congestion_control_min_rate = congestion_control_min_rate;
920 peer->timeout_counter += dtime;
921 if(peer->timeout_counter > m_timeout)
924 derr_con<<"RunTimeouts(): Peer "<<peer->id
926 <<" (source=peer->timeout_counter)"
928 // Add peer to the list
929 timeouted_peers.push_back(peer->id);
930 // Don't bother going through the buffers of this one
934 float resend_timeout = peer->resend_timeout;
935 for(u16 i=0; i<CHANNEL_COUNT; i++)
937 std::list<BufferedPacket> timed_outs;
939 Channel *channel = &peer->channels[i];
941 // Remove timed out incomplete unreliable split packets
942 channel->incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout);
944 // Increment reliable packet times
945 channel->outgoing_reliables.incrementTimeouts(dtime);
947 // Check reliable packet total times, remove peer if
949 if(channel->outgoing_reliables.anyTotaltimeReached(m_timeout))
952 derr_con<<"RunTimeouts(): Peer "<<peer->id
954 <<" (source=reliable packet totaltime)"
956 // Add peer to the to-be-removed list
957 timeouted_peers.push_back(peer->id);
961 // Re-send timed out outgoing reliables
963 timed_outs = channel->
964 outgoing_reliables.getTimedOuts(resend_timeout);
966 channel->outgoing_reliables.resetTimedOuts(resend_timeout);
968 for(std::list<BufferedPacket>::iterator j = timed_outs.begin();
969 j != timed_outs.end(); ++j)
971 u16 peer_id = readPeerId(*(j->data));
972 u8 channel = readChannel(*(j->data));
973 u16 seqnum = readU16(&(j->data[BASE_HEADER_SIZE+1]));
976 derr_con<<"RE-SENDING timed-out RELIABLE to ";
977 j->address.print(&derr_con);
978 derr_con<<"(t/o="<<resend_timeout<<"): "
979 <<"from_peer_id="<<peer_id
980 <<", channel="<<((int)channel&0xff)
981 <<", seqnum="<<seqnum
986 // Enlarge avg_rtt and resend_timeout:
987 // The rtt will be at least the timeout.
988 // NOTE: This won't affect the timeout of the next
989 // checked channel because it was cached.
990 peer->reportRTT(resend_timeout);
997 peer->ping_timer += dtime;
998 if(peer->ping_timer >= 5.0)
1000 // Create and send PING packet
1001 SharedBuffer<u8> data(2);
1002 writeU8(&data[0], TYPE_CONTROL);
1003 writeU8(&data[1], CONTROLTYPE_PING);
1004 rawSendAsPacket(peer->id, 0, data, true);
1006 peer->ping_timer = 0.0;
1013 // Remove timed out peers
1014 for(std::list<u16>::iterator i = timeouted_peers.begin();
1015 i != timeouted_peers.end(); ++i)
1017 PrintInfo(derr_con);
1018 derr_con<<"RunTimeouts(): Removing peer "<<(*i)<<std::endl;
1019 deletePeer(*i, true);
1023 void Connection::serve(u16 port)
1025 dout_con<<getDesc()<<" serving at port "<<port<<std::endl;
1027 m_socket.Bind(port);
1028 m_peer_id = PEER_ID_SERVER;
1030 catch(SocketException &e){
1038 void Connection::connect(Address address)
1040 dout_con<<getDesc()<<" connecting to "<<address.serializeString()
1041 <<":"<<address.getPort()<<std::endl;
1043 std::map<u16, Peer*>::iterator node = m_peers.find(PEER_ID_SERVER);
1044 if(node != m_peers.end()){
1045 throw ConnectionException("Already connected to a server");
1048 Peer *peer = new Peer(PEER_ID_SERVER, address);
1049 m_peers[peer->id] = peer;
1053 e.peerAdded(peer->id, peer->address);
1058 // Send a dummy packet to server with peer_id = PEER_ID_INEXISTENT
1059 m_peer_id = PEER_ID_INEXISTENT;
1060 SharedBuffer<u8> data(0);
1061 Send(PEER_ID_SERVER, 0, data, true);
1064 void Connection::disconnect()
1066 dout_con<<getDesc()<<" disconnecting"<<std::endl;
1068 // Create and send DISCO packet
1069 SharedBuffer<u8> data(2);
1070 writeU8(&data[0], TYPE_CONTROL);
1071 writeU8(&data[1], CONTROLTYPE_DISCO);
1074 for(std::map<u16, Peer*>::iterator j = m_peers.begin();
1075 j != m_peers.end(); ++j)
1077 Peer *peer = j->second;
1078 rawSendAsPacket(peer->id, 0, data, false);
1082 void Connection::sendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable)
1084 for(std::map<u16, Peer*>::iterator j = m_peers.begin();
1085 j != m_peers.end(); ++j)
1087 Peer *peer = j->second;
1088 send(peer->id, channelnum, data, reliable);
1092 void Connection::send(u16 peer_id, u8 channelnum,
1093 SharedBuffer<u8> data, bool reliable)
1095 dout_con<<getDesc()<<" sending to peer_id="<<peer_id<<std::endl;
1097 assert(channelnum < CHANNEL_COUNT);
1099 Peer *peer = getPeerNoEx(peer_id);
1102 Channel *channel = &(peer->channels[channelnum]);
1104 u32 chunksize_max = m_max_packet_size - BASE_HEADER_SIZE;
1106 chunksize_max -= RELIABLE_HEADER_SIZE;
1108 std::list<SharedBuffer<u8> > originals;
1109 originals = makeAutoSplitPacket(data, chunksize_max,
1110 channel->next_outgoing_split_seqnum);
1112 for(std::list<SharedBuffer<u8> >::iterator i = originals.begin();
1113 i != originals.end(); ++i)
1115 SharedBuffer<u8> original = *i;
1117 sendAsPacket(peer_id, channelnum, original, reliable);
1121 void Connection::sendAsPacket(u16 peer_id, u8 channelnum,
1122 SharedBuffer<u8> data, bool reliable)
1124 OutgoingPacket packet(peer_id, channelnum, data, reliable);
1125 m_outgoing_queue.push_back(packet);
1128 void Connection::rawSendAsPacket(u16 peer_id, u8 channelnum,
1129 SharedBuffer<u8> data, bool reliable)
1131 Peer *peer = getPeerNoEx(peer_id);
1134 Channel *channel = &(peer->channels[channelnum]);
1138 u16 seqnum = channel->next_outgoing_seqnum;
1139 channel->next_outgoing_seqnum++;
1141 SharedBuffer<u8> reliable = makeReliablePacket(data, seqnum);
1143 // Add base headers and make a packet
1144 BufferedPacket p = makePacket(peer->address, reliable,
1145 m_protocol_id, m_peer_id, channelnum);
1148 // Buffer the packet
1149 channel->outgoing_reliables.insert(p);
1151 catch(AlreadyExistsException &e)
1153 PrintInfo(derr_con);
1154 derr_con<<"WARNING: Going to send a reliable packet "
1155 "seqnum="<<seqnum<<" that is already "
1156 "in outgoing buffer"<<std::endl;
1165 // Add base headers and make a packet
1166 BufferedPacket p = makePacket(peer->address, data,
1167 m_protocol_id, m_peer_id, channelnum);
1174 void Connection::rawSend(const BufferedPacket &packet)
1177 m_socket.Send(packet.address, *packet.data, packet.data.getSize());
1178 } catch(SendFailedException &e){
1179 derr_con<<"Connection::rawSend(): SendFailedException: "
1180 <<packet.address.serializeString()<<std::endl;
1184 Peer* Connection::getPeer(u16 peer_id)
1186 std::map<u16, Peer*>::iterator node = m_peers.find(peer_id);
1188 if(node == m_peers.end()){
1189 throw PeerNotFoundException("GetPeer: Peer not found (possible timeout)");
1193 assert(node->second->id == peer_id);
1195 return node->second;
1198 Peer* Connection::getPeerNoEx(u16 peer_id)
1200 std::map<u16, Peer*>::iterator node = m_peers.find(peer_id);
1202 if(node == m_peers.end()){
1207 assert(node->second->id == peer_id);
1209 return node->second;
1212 std::list<Peer*> Connection::getPeers()
1214 std::list<Peer*> list;
1215 for(std::map<u16, Peer*>::iterator j = m_peers.begin();
1216 j != m_peers.end(); ++j)
1218 Peer *peer = j->second;
1219 list.push_back(peer);
1224 bool Connection::getFromBuffers(u16 &peer_id, SharedBuffer<u8> &dst)
1226 for(std::map<u16, Peer*>::iterator j = m_peers.begin();
1227 j != m_peers.end(); ++j)
1229 Peer *peer = j->second;
1230 for(u16 i=0; i<CHANNEL_COUNT; i++)
1232 Channel *channel = &peer->channels[i];
1233 SharedBuffer<u8> resultdata;
1234 bool got = checkIncomingBuffers(channel, peer_id, resultdata);
1244 bool Connection::checkIncomingBuffers(Channel *channel, u16 &peer_id,
1245 SharedBuffer<u8> &dst)
1247 u16 firstseqnum = 0;
1248 // Clear old packets from start of buffer
1250 bool found = channel->incoming_reliables.getFirstSeqnum(&firstseqnum);
1253 if(seqnum_higher(channel->next_incoming_seqnum, firstseqnum))
1254 channel->incoming_reliables.popFirst();
1258 // This happens if all packets are old
1260 if(channel->incoming_reliables.empty() == false)
1262 if(firstseqnum == channel->next_incoming_seqnum)
1264 BufferedPacket p = channel->incoming_reliables.popFirst();
1266 peer_id = readPeerId(*p.data);
1267 u8 channelnum = readChannel(*p.data);
1268 u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
1271 dout_con<<"UNBUFFERING TYPE_RELIABLE"
1272 <<" seqnum="<<seqnum
1273 <<" peer_id="<<peer_id
1274 <<" channel="<<((int)channelnum&0xff)
1277 channel->next_incoming_seqnum++;
1279 u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
1280 // Get out the inside packet and re-process it
1281 SharedBuffer<u8> payload(p.data.getSize() - headers_size);
1282 memcpy(*payload, &p.data[headers_size], payload.getSize());
1284 dst = processPacket(channel, payload, peer_id, channelnum, true);
1291 SharedBuffer<u8> Connection::processPacket(Channel *channel,
1292 SharedBuffer<u8> packetdata, u16 peer_id,
1293 u8 channelnum, bool reliable)
1295 IndentationRaiser iraiser(&(m_indentation));
1297 if(packetdata.getSize() < 1)
1298 throw InvalidIncomingDataException("packetdata.getSize() < 1");
1300 u8 type = readU8(&packetdata[0]);
1302 if(type == TYPE_CONTROL)
1304 if(packetdata.getSize() < 2)
1305 throw InvalidIncomingDataException("packetdata.getSize() < 2");
1307 u8 controltype = readU8(&packetdata[1]);
1309 if(controltype == CONTROLTYPE_ACK)
1311 if(packetdata.getSize() < 4)
1312 throw InvalidIncomingDataException
1313 ("packetdata.getSize() < 4 (ACK header size)");
1315 u16 seqnum = readU16(&packetdata[2]);
1317 dout_con<<"Got CONTROLTYPE_ACK: channelnum="
1318 <<((int)channelnum&0xff)<<", peer_id="<<peer_id
1319 <<", seqnum="<<seqnum<<std::endl;
1322 BufferedPacket p = channel->outgoing_reliables.popSeqnum(seqnum);
1323 // Get round trip time
1324 float rtt = p.totaltime;
1326 // Let peer calculate stuff according to it
1327 // (avg_rtt and resend_timeout)
1328 Peer *peer = getPeer(peer_id);
1329 peer->reportRTT(rtt);
1331 //PrintInfo(dout_con);
1332 //dout_con<<"RTT = "<<rtt<<std::endl;
1334 /*dout_con<<"OUTGOING: ";
1336 channel->outgoing_reliables.print();
1337 dout_con<<std::endl;*/
1339 catch(NotFoundException &e){
1340 PrintInfo(derr_con);
1341 derr_con<<"WARNING: ACKed packet not "
1346 throw ProcessedSilentlyException("Got an ACK");
1348 else if(controltype == CONTROLTYPE_SET_PEER_ID)
1350 if(packetdata.getSize() < 4)
1351 throw InvalidIncomingDataException
1352 ("packetdata.getSize() < 4 (SET_PEER_ID header size)");
1353 u16 peer_id_new = readU16(&packetdata[2]);
1355 dout_con<<"Got new peer id: "<<peer_id_new<<"... "<<std::endl;
1357 if(GetPeerID() != PEER_ID_INEXISTENT)
1359 PrintInfo(derr_con);
1360 derr_con<<"WARNING: Not changing"
1361 " existing peer id."<<std::endl;
1365 dout_con<<"changing."<<std::endl;
1366 SetPeerID(peer_id_new);
1368 throw ProcessedSilentlyException("Got a SET_PEER_ID");
1370 else if(controltype == CONTROLTYPE_PING)
1372 // Just ignore it, the incoming data already reset
1373 // the timeout counter
1375 dout_con<<"PING"<<std::endl;
1376 throw ProcessedSilentlyException("Got a PING");
1378 else if(controltype == CONTROLTYPE_DISCO)
1380 // Just ignore it, the incoming data already reset
1381 // the timeout counter
1383 dout_con<<"DISCO: Removing peer "<<(peer_id)<<std::endl;
1385 if(deletePeer(peer_id, false) == false)
1387 PrintInfo(derr_con);
1388 derr_con<<"DISCO: Peer not found"<<std::endl;
1391 throw ProcessedSilentlyException("Got a DISCO");
1394 PrintInfo(derr_con);
1395 derr_con<<"INVALID TYPE_CONTROL: invalid controltype="
1396 <<((int)controltype&0xff)<<std::endl;
1397 throw InvalidIncomingDataException("Invalid control type");
1400 else if(type == TYPE_ORIGINAL)
1402 if(packetdata.getSize() < ORIGINAL_HEADER_SIZE)
1403 throw InvalidIncomingDataException
1404 ("packetdata.getSize() < ORIGINAL_HEADER_SIZE");
1406 dout_con<<"RETURNING TYPE_ORIGINAL to user"
1408 // Get the inside packet out and return it
1409 SharedBuffer<u8> payload(packetdata.getSize() - ORIGINAL_HEADER_SIZE);
1410 memcpy(*payload, &packetdata[ORIGINAL_HEADER_SIZE], payload.getSize());
1413 else if(type == TYPE_SPLIT)
1415 // We have to create a packet again for buffering
1416 // This isn't actually too bad an idea.
1417 BufferedPacket packet = makePacket(
1418 getPeer(peer_id)->address,
1423 // Buffer the packet
1424 SharedBuffer<u8> data = channel->incoming_splits.insert(packet, reliable);
1425 if(data.getSize() != 0)
1428 dout_con<<"RETURNING TYPE_SPLIT: Constructed full data, "
1429 <<"size="<<data.getSize()<<std::endl;
1433 dout_con<<"BUFFERED TYPE_SPLIT"<<std::endl;
1434 throw ProcessedSilentlyException("Buffered a split packet chunk");
1436 else if(type == TYPE_RELIABLE)
1438 // Recursive reliable packets not allowed
1440 throw InvalidIncomingDataException("Found nested reliable packets");
1442 if(packetdata.getSize() < RELIABLE_HEADER_SIZE)
1443 throw InvalidIncomingDataException
1444 ("packetdata.getSize() < RELIABLE_HEADER_SIZE");
1446 u16 seqnum = readU16(&packetdata[1]);
1448 bool is_future_packet = seqnum_higher(seqnum, channel->next_incoming_seqnum);
1449 bool is_old_packet = seqnum_higher(channel->next_incoming_seqnum, seqnum);
1452 if(is_future_packet)
1453 dout_con<<"BUFFERING";
1454 else if(is_old_packet)
1458 dout_con<<" TYPE_RELIABLE seqnum="<<seqnum
1459 <<" next="<<channel->next_incoming_seqnum;
1460 dout_con<<" [sending CONTROLTYPE_ACK"
1461 " to peer_id="<<peer_id<<"]";
1462 dout_con<<std::endl;
1465 //assert(channel->incoming_reliables.size() < 100);
1467 // Send a CONTROLTYPE_ACK
1468 SharedBuffer<u8> reply(4);
1469 writeU8(&reply[0], TYPE_CONTROL);
1470 writeU8(&reply[1], CONTROLTYPE_ACK);
1471 writeU16(&reply[2], seqnum);
1472 rawSendAsPacket(peer_id, channelnum, reply, false);
1474 //if(seqnum_higher(seqnum, channel->next_incoming_seqnum))
1475 if(is_future_packet)
1478 dout_con<<"Buffering reliable packet (seqnum="
1479 <<seqnum<<")"<<std::endl;*/
1481 // This one comes later, buffer it.
1482 // Actually we have to make a packet to buffer one.
1483 // Well, we have all the ingredients, so just do it.
1484 BufferedPacket packet = makePacket(
1485 getPeer(peer_id)->address,
1491 channel->incoming_reliables.insert(packet);
1494 dout_con<<"INCOMING: ";
1495 channel->incoming_reliables.print();
1496 dout_con<<std::endl;*/
1498 catch(AlreadyExistsException &e)
1502 throw ProcessedSilentlyException("Buffered future reliable packet");
1504 //else if(seqnum_higher(channel->next_incoming_seqnum, seqnum))
1505 else if(is_old_packet)
1507 // An old packet, dump it
1508 throw InvalidIncomingDataException("Got an old reliable packet");
1511 channel->next_incoming_seqnum++;
1513 // Get out the inside packet and re-process it
1514 SharedBuffer<u8> payload(packetdata.getSize() - RELIABLE_HEADER_SIZE);
1515 memcpy(*payload, &packetdata[RELIABLE_HEADER_SIZE], payload.getSize());
1517 return processPacket(channel, payload, peer_id, channelnum, true);
1521 PrintInfo(derr_con);
1522 derr_con<<"Got invalid type="<<((int)type&0xff)<<std::endl;
1523 throw InvalidIncomingDataException("Invalid packet type");
1526 // We should never get here.
1527 // If you get here, add an exception or a return to some of the
1528 // above conditionals.
1530 throw BaseException("Error in Channel::ProcessPacket()");
1533 bool Connection::deletePeer(u16 peer_id, bool timeout)
1535 if(m_peers.find(peer_id) == m_peers.end())
1538 Peer *peer = m_peers[peer_id];
1542 e.peerRemoved(peer_id, timeout, peer->address);
1545 delete m_peers[peer_id];
1546 m_peers.erase(peer_id);
1552 ConnectionEvent Connection::getEvent()
1554 if(m_event_queue.empty()){
1556 e.type = CONNEVENT_NONE;
1559 return m_event_queue.pop_front();
1562 ConnectionEvent Connection::waitEvent(u32 timeout_ms)
1565 return m_event_queue.pop_front(timeout_ms);
1566 } catch(ItemNotFoundException &ex){
1568 e.type = CONNEVENT_NONE;
1573 void Connection::putCommand(ConnectionCommand &c)
1575 m_command_queue.push_back(c);
1578 void Connection::Serve(unsigned short port)
1580 ConnectionCommand c;
1585 void Connection::Connect(Address address)
1587 ConnectionCommand c;
1592 bool Connection::Connected()
1594 JMutexAutoLock peerlock(m_peers_mutex);
1596 if(m_peers.size() != 1)
1599 std::map<u16, Peer*>::iterator node = m_peers.find(PEER_ID_SERVER);
1600 if(node == m_peers.end())
1603 if(m_peer_id == PEER_ID_INEXISTENT)
1609 void Connection::Disconnect()
1611 ConnectionCommand c;
1616 u32 Connection::Receive(u16 &peer_id, SharedBuffer<u8> &data)
1619 ConnectionEvent e = waitEvent(m_bc_receive_timeout);
1620 if(e.type != CONNEVENT_NONE)
1621 dout_con<<getDesc()<<": Receive: got event: "
1622 <<e.describe()<<std::endl;
1624 case CONNEVENT_NONE:
1625 throw NoIncomingDataException("No incoming data");
1626 case CONNEVENT_DATA_RECEIVED:
1627 peer_id = e.peer_id;
1628 data = SharedBuffer<u8>(e.data);
1629 return e.data.getSize();
1630 case CONNEVENT_PEER_ADDED: {
1631 Peer tmp(e.peer_id, e.address);
1632 if(m_bc_peerhandler)
1633 m_bc_peerhandler->peerAdded(&tmp);
1635 case CONNEVENT_PEER_REMOVED: {
1636 Peer tmp(e.peer_id, e.address);
1637 if(m_bc_peerhandler)
1638 m_bc_peerhandler->deletingPeer(&tmp, e.timeout);
1640 case CONNEVENT_BIND_FAILED:
1641 throw ConnectionBindFailed("Failed to bind socket "
1642 "(port already in use?)");
1645 throw NoIncomingDataException("No incoming data");
1648 void Connection::SendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable)
1650 assert(channelnum < CHANNEL_COUNT);
1652 ConnectionCommand c;
1653 c.sendToAll(channelnum, data, reliable);
1657 void Connection::Send(u16 peer_id, u8 channelnum,
1658 SharedBuffer<u8> data, bool reliable)
1660 assert(channelnum < CHANNEL_COUNT);
1662 ConnectionCommand c;
1663 c.send(peer_id, channelnum, data, reliable);
1667 void Connection::RunTimeouts(float dtime)
1672 Address Connection::GetPeerAddress(u16 peer_id)
1674 JMutexAutoLock peerlock(m_peers_mutex);
1675 return getPeer(peer_id)->address;
1678 float Connection::GetPeerAvgRTT(u16 peer_id)
1680 JMutexAutoLock peerlock(m_peers_mutex);
1681 return getPeer(peer_id)->avg_rtt;
1684 void Connection::DeletePeer(u16 peer_id)
1686 ConnectionCommand c;
1687 c.deletePeer(peer_id);
1691 void Connection::PrintInfo(std::ostream &out)
1693 out<<getDesc()<<": ";
1696 void Connection::PrintInfo()
1698 PrintInfo(dout_con);
1701 std::string Connection::getDesc()
1703 return std::string("con(")+itos(m_socket.GetHandle())+"/"+itos(m_peer_id)+")";