3 Copyright (C) 2013 celeron55, Perttu Ahola <celeron55@gmail.com>
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU Lesser General Public License as published by
7 the Free Software Foundation; either version 2.1 of the License, or
8 (at your option) any later version.
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU Lesser General Public License for more details.
15 You should have received a copy of the GNU Lesser General Public License along
16 with this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 #include "connection.h"
22 #include "serialization.h"
25 #include "util/serialize.h"
26 #include "util/numeric.h"
27 #include "util/string.h"
33 static u16 readPeerId(u8 *packetdata)
35 return readU16(&packetdata[4]);
37 static u8 readChannel(u8 *packetdata)
39 return readU8(&packetdata[6]);
42 BufferedPacket makePacket(Address &address, u8 *data, u32 datasize,
43 u32 protocol_id, u16 sender_peer_id, u8 channel)
45 u32 packet_size = datasize + BASE_HEADER_SIZE;
46 BufferedPacket p(packet_size);
49 writeU32(&p.data[0], protocol_id);
50 writeU16(&p.data[4], sender_peer_id);
51 writeU8(&p.data[6], channel);
53 memcpy(&p.data[BASE_HEADER_SIZE], data, datasize);
58 BufferedPacket makePacket(Address &address, SharedBuffer<u8> &data,
59 u32 protocol_id, u16 sender_peer_id, u8 channel)
61 return makePacket(address, *data, data.getSize(),
62 protocol_id, sender_peer_id, channel);
65 SharedBuffer<u8> makeOriginalPacket(
66 SharedBuffer<u8> data)
69 u32 packet_size = data.getSize() + header_size;
70 SharedBuffer<u8> b(packet_size);
72 writeU8(&b[0], TYPE_ORIGINAL);
74 memcpy(&b[header_size], *data, data.getSize());
79 std::list<SharedBuffer<u8> > makeSplitPacket(
80 SharedBuffer<u8> data,
84 // Chunk packets, containing the TYPE_SPLIT header
85 std::list<SharedBuffer<u8> > chunks;
87 u32 chunk_header_size = 7;
88 u32 maximum_data_size = chunksize_max - chunk_header_size;
94 end = start + maximum_data_size - 1;
95 if(end > data.getSize() - 1)
96 end = data.getSize() - 1;
98 u32 payload_size = end - start + 1;
99 u32 packet_size = chunk_header_size + payload_size;
101 SharedBuffer<u8> chunk(packet_size);
103 writeU8(&chunk[0], TYPE_SPLIT);
104 writeU16(&chunk[1], seqnum);
105 // [3] u16 chunk_count is written at next stage
106 writeU16(&chunk[5], chunk_num);
107 memcpy(&chunk[chunk_header_size], &data[start], payload_size);
109 chunks.push_back(chunk);
115 while(end != data.getSize() - 1);
117 for(std::list<SharedBuffer<u8> >::iterator i = chunks.begin();
118 i != chunks.end(); ++i)
121 writeU16(&((*i)[3]), chunk_count);
127 std::list<SharedBuffer<u8> > makeAutoSplitPacket(
128 SharedBuffer<u8> data,
132 u32 original_header_size = 1;
133 std::list<SharedBuffer<u8> > list;
134 if(data.getSize() + original_header_size > chunksize_max)
136 list = makeSplitPacket(data, chunksize_max, split_seqnum);
142 list.push_back(makeOriginalPacket(data));
147 SharedBuffer<u8> makeReliablePacket(
148 SharedBuffer<u8> data,
151 /*dstream<<"BEGIN SharedBuffer<u8> makeReliablePacket()"<<std::endl;
152 dstream<<"data.getSize()="<<data.getSize()<<", data[0]="
153 <<((unsigned int)data[0]&0xff)<<std::endl;*/
155 u32 packet_size = data.getSize() + header_size;
156 SharedBuffer<u8> b(packet_size);
158 writeU8(&b[0], TYPE_RELIABLE);
159 writeU16(&b[1], seqnum);
161 memcpy(&b[header_size], *data, data.getSize());
163 /*dstream<<"data.getSize()="<<data.getSize()<<", data[0]="
164 <<((unsigned int)data[0]&0xff)<<std::endl;*/
165 //dstream<<"END SharedBuffer<u8> makeReliablePacket()"<<std::endl;
173 ReliablePacketBuffer::ReliablePacketBuffer(): m_list_size(0) {}
175 void ReliablePacketBuffer::print()
177 for(std::list<BufferedPacket>::iterator i = m_list.begin();
181 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
185 bool ReliablePacketBuffer::empty()
187 return m_list.empty();
189 u32 ReliablePacketBuffer::size()
193 RPBSearchResult ReliablePacketBuffer::findPacket(u16 seqnum)
195 std::list<BufferedPacket>::iterator i = m_list.begin();
196 for(; i != m_list.end(); ++i)
198 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
199 /*dout_con<<"findPacket(): finding seqnum="<<seqnum
200 <<", comparing to s="<<s<<std::endl;*/
206 RPBSearchResult ReliablePacketBuffer::notFound()
210 u16 ReliablePacketBuffer::getFirstSeqnum()
213 throw NotFoundException("Buffer is empty");
214 BufferedPacket p = *m_list.begin();
215 return readU16(&p.data[BASE_HEADER_SIZE+1]);
217 BufferedPacket ReliablePacketBuffer::popFirst()
220 throw NotFoundException("Buffer is empty");
221 BufferedPacket p = *m_list.begin();
222 m_list.erase(m_list.begin());
226 BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum)
228 RPBSearchResult r = findPacket(seqnum);
230 dout_con<<"Not found"<<std::endl;
231 throw NotFoundException("seqnum not found in buffer");
233 BufferedPacket p = *r;
238 void ReliablePacketBuffer::insert(BufferedPacket &p)
240 assert(p.data.getSize() >= BASE_HEADER_SIZE+3);
241 u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
242 assert(type == TYPE_RELIABLE);
243 u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
246 // Find the right place for the packet and insert it there
248 // If list is empty, just add it
255 // Otherwise find the right place
256 std::list<BufferedPacket>::iterator i = m_list.begin();
257 // Find the first packet in the list which has a higher seqnum
258 for(; i != m_list.end(); ++i){
259 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
262 throw AlreadyExistsException("Same seqnum in list");
264 if(seqnum_higher(s, seqnum)){
268 // If we're at the end of the list, add the packet to the
270 if(i == m_list.end())
280 void ReliablePacketBuffer::incrementTimeouts(float dtime)
282 for(std::list<BufferedPacket>::iterator i = m_list.begin();
283 i != m_list.end(); ++i)
286 i->totaltime += dtime;
290 void ReliablePacketBuffer::resetTimedOuts(float timeout)
292 for(std::list<BufferedPacket>::iterator i = m_list.begin();
293 i != m_list.end(); ++i)
295 if(i->time >= timeout)
300 bool ReliablePacketBuffer::anyTotaltimeReached(float timeout)
302 for(std::list<BufferedPacket>::iterator i = m_list.begin();
303 i != m_list.end(); ++i)
305 if(i->totaltime >= timeout)
311 std::list<BufferedPacket> ReliablePacketBuffer::getTimedOuts(float timeout)
313 std::list<BufferedPacket> timed_outs;
314 for(std::list<BufferedPacket>::iterator i = m_list.begin();
315 i != m_list.end(); ++i)
317 if(i->time >= timeout)
318 timed_outs.push_back(*i);
327 IncomingSplitBuffer::~IncomingSplitBuffer()
329 for(std::map<u16, IncomingSplitPacket*>::iterator i = m_buf.begin();
330 i != m_buf.end(); ++i)
336 This will throw a GotSplitPacketException when a full
337 split packet is constructed.
339 SharedBuffer<u8> IncomingSplitBuffer::insert(BufferedPacket &p, bool reliable)
341 u32 headersize = BASE_HEADER_SIZE + 7;
342 assert(p.data.getSize() >= headersize);
343 u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
344 assert(type == TYPE_SPLIT);
345 u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
346 u16 chunk_count = readU16(&p.data[BASE_HEADER_SIZE+3]);
347 u16 chunk_num = readU16(&p.data[BASE_HEADER_SIZE+5]);
349 // Add if doesn't exist
350 if(m_buf.find(seqnum) == m_buf.end())
352 IncomingSplitPacket *sp = new IncomingSplitPacket();
353 sp->chunk_count = chunk_count;
354 sp->reliable = reliable;
358 IncomingSplitPacket *sp = m_buf[seqnum];
360 // TODO: These errors should be thrown or something? Dunno.
361 if(chunk_count != sp->chunk_count)
362 derr_con<<"Connection: WARNING: chunk_count="<<chunk_count
363 <<" != sp->chunk_count="<<sp->chunk_count
365 if(reliable != sp->reliable)
366 derr_con<<"Connection: WARNING: reliable="<<reliable
367 <<" != sp->reliable="<<sp->reliable
370 // If chunk already exists, ignore it.
371 // Sometimes two identical packets may arrive when there is network
372 // lag and the server re-sends stuff.
373 if(sp->chunks.find(chunk_num) != sp->chunks.end())
374 return SharedBuffer<u8>();
376 // Cut chunk data out of packet
377 u32 chunkdatasize = p.data.getSize() - headersize;
378 SharedBuffer<u8> chunkdata(chunkdatasize);
379 memcpy(*chunkdata, &(p.data[headersize]), chunkdatasize);
381 // Set chunk data in buffer
382 sp->chunks[chunk_num] = chunkdata;
384 // If not all chunks are received, return empty buffer
385 if(sp->allReceived() == false)
386 return SharedBuffer<u8>();
388 // Calculate total size
390 for(std::map<u16, SharedBuffer<u8> >::iterator i = sp->chunks.begin();
391 i != sp->chunks.end(); ++i)
393 totalsize += i->second.getSize();
396 SharedBuffer<u8> fulldata(totalsize);
398 // Copy chunks to data buffer
400 for(u32 chunk_i=0; chunk_i<sp->chunk_count;
403 SharedBuffer<u8> buf = sp->chunks[chunk_i];
404 u16 chunkdatasize = buf.getSize();
405 memcpy(&fulldata[start], *buf, chunkdatasize);
406 start += chunkdatasize;;
409 // Remove sp from buffer
415 void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout)
417 std::list<u16> remove_queue;
418 for(std::map<u16, IncomingSplitPacket*>::iterator i = m_buf.begin();
419 i != m_buf.end(); ++i)
421 IncomingSplitPacket *p = i->second;
422 // Reliable ones are not removed by timeout
423 if(p->reliable == true)
426 if(p->time >= timeout)
427 remove_queue.push_back(i->first);
429 for(std::list<u16>::iterator j = remove_queue.begin();
430 j != remove_queue.end(); ++j)
432 dout_con<<"NOTE: Removing timed out unreliable split packet"
445 next_outgoing_seqnum = SEQNUM_INITIAL;
446 next_incoming_seqnum = SEQNUM_INITIAL;
447 next_outgoing_split_seqnum = SEQNUM_INITIAL;
457 Peer::Peer(u16 a_id, Address a_address):
460 timeout_counter(0.0),
464 has_sent_with_id(false),
466 m_max_packets_per_second(10),
469 congestion_control_aim_rtt(0.2),
470 congestion_control_max_rate(400),
471 congestion_control_min_rate(10)
478 void Peer::reportRTT(float rtt)
482 if(m_max_packets_per_second < congestion_control_max_rate)
483 m_max_packets_per_second += 10;
484 } else if(rtt < congestion_control_aim_rtt){
485 if(m_max_packets_per_second < congestion_control_max_rate)
486 m_max_packets_per_second += 2;
488 m_max_packets_per_second *= 0.8;
489 if(m_max_packets_per_second < congestion_control_min_rate)
490 m_max_packets_per_second = congestion_control_min_rate;
496 else if(avg_rtt < 0.0)
499 avg_rtt = rtt * 0.1 + avg_rtt * 0.9;
501 // Calculate resend_timeout
503 /*int reliable_count = 0;
504 for(int i=0; i<CHANNEL_COUNT; i++)
506 reliable_count += channels[i].outgoing_reliables.size();
508 float timeout = avg_rtt * RESEND_TIMEOUT_FACTOR
509 * ((float)reliable_count * 1);*/
511 float timeout = avg_rtt * RESEND_TIMEOUT_FACTOR;
512 if(timeout < RESEND_TIMEOUT_MIN)
513 timeout = RESEND_TIMEOUT_MIN;
514 if(timeout > RESEND_TIMEOUT_MAX)
515 timeout = RESEND_TIMEOUT_MAX;
516 resend_timeout = timeout;
523 Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout):
524 m_protocol_id(protocol_id),
525 m_max_packet_size(max_packet_size),
528 m_bc_peerhandler(NULL),
529 m_bc_receive_timeout(0),
532 m_socket.setTimeoutMs(5);
537 Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout,
538 PeerHandler *peerhandler):
539 m_protocol_id(protocol_id),
540 m_max_packet_size(max_packet_size),
543 m_bc_peerhandler(peerhandler),
544 m_bc_receive_timeout(0),
547 m_socket.setTimeoutMs(5);
553 Connection::~Connection()
557 for(std::map<u16, Peer*>::iterator
559 j != m_peers.end(); ++j)
567 void * Connection::Thread()
570 log_register_thread("Connection");
572 dout_con<<"Connection thread started"<<std::endl;
574 u32 curtime = porting::getTimeMs();
575 u32 lasttime = curtime;
579 BEGIN_DEBUG_EXCEPTION_HANDLER
582 curtime = porting::getTimeMs();
583 float dtime = (float)(curtime - lasttime) / 1000.;
591 while(!m_command_queue.empty()){
592 ConnectionCommand c = m_command_queue.pop_front();
600 END_DEBUG_EXCEPTION_HANDLER(derr_con);
606 void Connection::putEvent(ConnectionEvent &e)
608 assert(e.type != CONNEVENT_NONE);
609 m_event_queue.push_back(e);
612 void Connection::processCommand(ConnectionCommand &c)
616 dout_con<<getDesc()<<" processing CONNCMD_NONE"<<std::endl;
619 dout_con<<getDesc()<<" processing CONNCMD_SERVE port="
623 case CONNCMD_CONNECT:
624 dout_con<<getDesc()<<" processing CONNCMD_CONNECT"<<std::endl;
627 case CONNCMD_DISCONNECT:
628 dout_con<<getDesc()<<" processing CONNCMD_DISCONNECT"<<std::endl;
632 dout_con<<getDesc()<<" processing CONNCMD_SEND"<<std::endl;
633 send(c.peer_id, c.channelnum, c.data, c.reliable);
635 case CONNCMD_SEND_TO_ALL:
636 dout_con<<getDesc()<<" processing CONNCMD_SEND_TO_ALL"<<std::endl;
637 sendToAll(c.channelnum, c.data, c.reliable);
639 case CONNCMD_DELETE_PEER:
640 dout_con<<getDesc()<<" processing CONNCMD_DELETE_PEER"<<std::endl;
641 deletePeer(c.peer_id, false);
646 void Connection::send(float dtime)
648 for(std::map<u16, Peer*>::iterator
650 j != m_peers.end(); ++j)
652 Peer *peer = j->second;
653 peer->m_sendtime_accu += dtime;
654 peer->m_num_sent = 0;
655 peer->m_max_num_sent = peer->m_sendtime_accu *
656 peer->m_max_packets_per_second;
658 Queue<OutgoingPacket> postponed_packets;
659 while(!m_outgoing_queue.empty()){
660 OutgoingPacket packet = m_outgoing_queue.pop_front();
661 Peer *peer = getPeerNoEx(packet.peer_id);
664 if(peer->channels[packet.channelnum].outgoing_reliables.size() >= 5){
665 postponed_packets.push_back(packet);
666 } else if(peer->m_num_sent < peer->m_max_num_sent){
667 rawSendAsPacket(packet.peer_id, packet.channelnum,
668 packet.data, packet.reliable);
671 postponed_packets.push_back(packet);
674 while(!postponed_packets.empty()){
675 m_outgoing_queue.push_back(postponed_packets.pop_front());
677 for(std::map<u16, Peer*>::iterator
679 j != m_peers.end(); ++j)
681 Peer *peer = j->second;
682 peer->m_sendtime_accu -= (float)peer->m_num_sent /
683 peer->m_max_packets_per_second;
684 if(peer->m_sendtime_accu > 10. / peer->m_max_packets_per_second)
685 peer->m_sendtime_accu = 10. / peer->m_max_packets_per_second;
689 // Receive packets from the network and buffers and create ConnectionEvents
690 void Connection::receive()
692 u32 datasize = m_max_packet_size * 2; // Double it just to be safe
693 // TODO: We can not know how many layers of header there are.
694 // For now, just assume there are no other than the base headers.
695 u32 packet_maxsize = datasize + BASE_HEADER_SIZE;
696 SharedBuffer<u8> packetdata(packet_maxsize);
698 bool single_wait_done = false;
703 /* Check if some buffer has relevant data */
706 SharedBuffer<u8> resultdata;
707 bool got = getFromBuffers(peer_id, resultdata);
710 e.dataReceived(peer_id, resultdata);
716 if(single_wait_done){
717 if(m_socket.WaitData(0) == false)
721 single_wait_done = true;
724 s32 received_size = m_socket.Receive(sender, *packetdata, packet_maxsize);
726 if(received_size < 0)
728 if(received_size < BASE_HEADER_SIZE)
730 if(readU32(&packetdata[0]) != m_protocol_id)
733 u16 peer_id = readPeerId(*packetdata);
734 u8 channelnum = readChannel(*packetdata);
735 if(channelnum > CHANNEL_COUNT-1){
737 derr_con<<"Receive(): Invalid channel "<<channelnum<<std::endl;
738 throw InvalidIncomingDataException("Channel doesn't exist");
741 if(peer_id == PEER_ID_INEXISTENT)
744 Somebody is trying to send stuff to us with no peer id.
746 Check if the same address and port was added to our peer
748 Allow only entries that have has_sent_with_id==false.
751 std::map<u16, Peer*>::iterator j;
753 for(; j != m_peers.end(); ++j)
755 Peer *peer = j->second;
756 if(peer->has_sent_with_id)
758 if(peer->address == sender)
763 If no peer was found with the same address and port,
764 we shall assume it is a new peer and create an entry.
766 if(j == m_peers.end())
768 // Pass on to adding the peer
770 // Else: A peer was found.
773 Peer *peer = j->second;
776 derr_con<<"WARNING: Assuming unknown peer to be "
777 <<"peer_id="<<peer_id<<std::endl;
782 The peer was not found in our lists. Add it.
784 if(peer_id == PEER_ID_INEXISTENT)
786 // Somebody wants to make a new connection
788 // Get a unique peer id (2 or higher)
791 Find an unused peer id
793 bool out_of_ids = false;
797 if(m_peers.find(peer_id_new) == m_peers.end())
799 // Check for overflow
800 if(peer_id_new == 65535){
807 errorstream<<getDesc()<<" ran out of peer ids"<<std::endl;
812 dout_con<<"Receive(): Got a packet with peer_id=PEER_ID_INEXISTENT,"
813 " giving peer_id="<<peer_id_new<<std::endl;
816 Peer *peer = new Peer(peer_id_new, sender);
817 m_peers[peer->id] = peer;
819 // Create peer addition event
821 e.peerAdded(peer_id_new, sender);
824 // Create CONTROL packet to tell the peer id to the new peer.
825 SharedBuffer<u8> reply(4);
826 writeU8(&reply[0], TYPE_CONTROL);
827 writeU8(&reply[1], CONTROLTYPE_SET_PEER_ID);
828 writeU16(&reply[2], peer_id_new);
829 sendAsPacket(peer_id_new, 0, reply, true);
831 // We're now talking to a valid peer_id
832 peer_id = peer_id_new;
834 // Go on and process whatever it sent
837 std::map<u16, Peer*>::iterator node = m_peers.find(peer_id);
839 if(node == m_peers.end())
842 // This means that the peer id of the sender is not PEER_ID_INEXISTENT
843 // and it is invalid.
845 derr_con<<"Receive(): Peer not found"<<std::endl;
846 throw InvalidIncomingDataException("Peer not found (possible timeout)");
849 Peer *peer = node->second;
851 // Validate peer address
852 if(peer->address != sender)
855 derr_con<<"Peer "<<peer_id<<" sending from different address."
856 " Ignoring."<<std::endl;
860 peer->timeout_counter = 0.0;
862 Channel *channel = &(peer->channels[channelnum]);
864 // Throw the received packet to channel->processPacket()
866 // Make a new SharedBuffer from the data without the base headers
867 SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE);
868 memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE],
869 strippeddata.getSize());
872 // Process it (the result is some data with no headers made by us)
873 SharedBuffer<u8> resultdata = processPacket
874 (channel, strippeddata, peer_id, channelnum, false);
877 dout_con<<"ProcessPacket returned data of size "
878 <<resultdata.getSize()<<std::endl;
881 e.dataReceived(peer_id, resultdata);
884 }catch(ProcessedSilentlyException &e){
886 }catch(InvalidIncomingDataException &e){
888 catch(ProcessedSilentlyException &e){
893 void Connection::runTimeouts(float dtime)
895 float congestion_control_aim_rtt
896 = g_settings->getFloat("congestion_control_aim_rtt");
897 float congestion_control_max_rate
898 = g_settings->getFloat("congestion_control_max_rate");
899 float congestion_control_min_rate
900 = g_settings->getFloat("congestion_control_min_rate");
902 std::list<u16> timeouted_peers;
903 for(std::map<u16, Peer*>::iterator j = m_peers.begin();
904 j != m_peers.end(); ++j)
906 Peer *peer = j->second;
908 // Update congestion control values
909 peer->congestion_control_aim_rtt = congestion_control_aim_rtt;
910 peer->congestion_control_max_rate = congestion_control_max_rate;
911 peer->congestion_control_min_rate = congestion_control_min_rate;
916 peer->timeout_counter += dtime;
917 if(peer->timeout_counter > m_timeout)
920 derr_con<<"RunTimeouts(): Peer "<<peer->id
922 <<" (source=peer->timeout_counter)"
924 // Add peer to the list
925 timeouted_peers.push_back(peer->id);
926 // Don't bother going through the buffers of this one
930 float resend_timeout = peer->resend_timeout;
931 for(u16 i=0; i<CHANNEL_COUNT; i++)
933 std::list<BufferedPacket> timed_outs;
935 Channel *channel = &peer->channels[i];
937 // Remove timed out incomplete unreliable split packets
938 channel->incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout);
940 // Increment reliable packet times
941 channel->outgoing_reliables.incrementTimeouts(dtime);
943 // Check reliable packet total times, remove peer if
945 if(channel->outgoing_reliables.anyTotaltimeReached(m_timeout))
948 derr_con<<"RunTimeouts(): Peer "<<peer->id
950 <<" (source=reliable packet totaltime)"
952 // Add peer to the to-be-removed list
953 timeouted_peers.push_back(peer->id);
957 // Re-send timed out outgoing reliables
959 timed_outs = channel->
960 outgoing_reliables.getTimedOuts(resend_timeout);
962 channel->outgoing_reliables.resetTimedOuts(resend_timeout);
964 for(std::list<BufferedPacket>::iterator j = timed_outs.begin();
965 j != timed_outs.end(); ++j)
967 u16 peer_id = readPeerId(*(j->data));
968 u8 channel = readChannel(*(j->data));
969 u16 seqnum = readU16(&(j->data[BASE_HEADER_SIZE+1]));
972 derr_con<<"RE-SENDING timed-out RELIABLE to ";
973 j->address.print(&derr_con);
974 derr_con<<"(t/o="<<resend_timeout<<"): "
975 <<"from_peer_id="<<peer_id
976 <<", channel="<<((int)channel&0xff)
977 <<", seqnum="<<seqnum
982 // Enlarge avg_rtt and resend_timeout:
983 // The rtt will be at least the timeout.
984 // NOTE: This won't affect the timeout of the next
985 // checked channel because it was cached.
986 peer->reportRTT(resend_timeout);
993 peer->ping_timer += dtime;
994 if(peer->ping_timer >= 5.0)
996 // Create and send PING packet
997 SharedBuffer<u8> data(2);
998 writeU8(&data[0], TYPE_CONTROL);
999 writeU8(&data[1], CONTROLTYPE_PING);
1000 rawSendAsPacket(peer->id, 0, data, true);
1002 peer->ping_timer = 0.0;
1009 // Remove timed out peers
1010 for(std::list<u16>::iterator i = timeouted_peers.begin();
1011 i != timeouted_peers.end(); ++i)
1013 PrintInfo(derr_con);
1014 derr_con<<"RunTimeouts(): Removing peer "<<(*i)<<std::endl;
1015 deletePeer(*i, true);
1019 void Connection::serve(u16 port)
1021 dout_con<<getDesc()<<" serving at port "<<port<<std::endl;
1023 m_socket.Bind(port);
1024 m_peer_id = PEER_ID_SERVER;
1026 catch(SocketException &e){
1034 void Connection::connect(Address address)
1036 dout_con<<getDesc()<<" connecting to "<<address.serializeString()
1037 <<":"<<address.getPort()<<std::endl;
1039 std::map<u16, Peer*>::iterator node = m_peers.find(PEER_ID_SERVER);
1040 if(node != m_peers.end()){
1041 throw ConnectionException("Already connected to a server");
1044 Peer *peer = new Peer(PEER_ID_SERVER, address);
1045 m_peers[peer->id] = peer;
1049 e.peerAdded(peer->id, peer->address);
1054 // Send a dummy packet to server with peer_id = PEER_ID_INEXISTENT
1055 m_peer_id = PEER_ID_INEXISTENT;
1056 SharedBuffer<u8> data(0);
1057 Send(PEER_ID_SERVER, 0, data, true);
1060 void Connection::disconnect()
1062 dout_con<<getDesc()<<" disconnecting"<<std::endl;
1064 // Create and send DISCO packet
1065 SharedBuffer<u8> data(2);
1066 writeU8(&data[0], TYPE_CONTROL);
1067 writeU8(&data[1], CONTROLTYPE_DISCO);
1070 for(std::map<u16, Peer*>::iterator j = m_peers.begin();
1071 j != m_peers.end(); ++j)
1073 Peer *peer = j->second;
1074 rawSendAsPacket(peer->id, 0, data, false);
1078 void Connection::sendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable)
1080 for(std::map<u16, Peer*>::iterator j = m_peers.begin();
1081 j != m_peers.end(); ++j)
1083 Peer *peer = j->second;
1084 send(peer->id, channelnum, data, reliable);
1088 void Connection::send(u16 peer_id, u8 channelnum,
1089 SharedBuffer<u8> data, bool reliable)
1091 dout_con<<getDesc()<<" sending to peer_id="<<peer_id<<std::endl;
1093 assert(channelnum < CHANNEL_COUNT);
1095 Peer *peer = getPeerNoEx(peer_id);
1098 Channel *channel = &(peer->channels[channelnum]);
1100 u32 chunksize_max = m_max_packet_size - BASE_HEADER_SIZE;
1102 chunksize_max -= RELIABLE_HEADER_SIZE;
1104 std::list<SharedBuffer<u8> > originals;
1105 originals = makeAutoSplitPacket(data, chunksize_max,
1106 channel->next_outgoing_split_seqnum);
1108 for(std::list<SharedBuffer<u8> >::iterator i = originals.begin();
1109 i != originals.end(); ++i)
1111 SharedBuffer<u8> original = *i;
1113 sendAsPacket(peer_id, channelnum, original, reliable);
1117 void Connection::sendAsPacket(u16 peer_id, u8 channelnum,
1118 SharedBuffer<u8> data, bool reliable)
1120 OutgoingPacket packet(peer_id, channelnum, data, reliable);
1121 m_outgoing_queue.push_back(packet);
1124 void Connection::rawSendAsPacket(u16 peer_id, u8 channelnum,
1125 SharedBuffer<u8> data, bool reliable)
1127 Peer *peer = getPeerNoEx(peer_id);
1130 Channel *channel = &(peer->channels[channelnum]);
1134 u16 seqnum = channel->next_outgoing_seqnum;
1135 channel->next_outgoing_seqnum++;
1137 SharedBuffer<u8> reliable = makeReliablePacket(data, seqnum);
1139 // Add base headers and make a packet
1140 BufferedPacket p = makePacket(peer->address, reliable,
1141 m_protocol_id, m_peer_id, channelnum);
1144 // Buffer the packet
1145 channel->outgoing_reliables.insert(p);
1147 catch(AlreadyExistsException &e)
1149 PrintInfo(derr_con);
1150 derr_con<<"WARNING: Going to send a reliable packet "
1151 "seqnum="<<seqnum<<" that is already "
1152 "in outgoing buffer"<<std::endl;
1161 // Add base headers and make a packet
1162 BufferedPacket p = makePacket(peer->address, data,
1163 m_protocol_id, m_peer_id, channelnum);
1170 void Connection::rawSend(const BufferedPacket &packet)
1173 m_socket.Send(packet.address, *packet.data, packet.data.getSize());
1174 } catch(SendFailedException &e){
1175 derr_con<<"Connection::rawSend(): SendFailedException: "
1176 <<packet.address.serializeString()<<std::endl;
1180 Peer* Connection::getPeer(u16 peer_id)
1182 std::map<u16, Peer*>::iterator node = m_peers.find(peer_id);
1184 if(node == m_peers.end()){
1185 throw PeerNotFoundException("GetPeer: Peer not found (possible timeout)");
1189 assert(node->second->id == peer_id);
1191 return node->second;
1194 Peer* Connection::getPeerNoEx(u16 peer_id)
1196 std::map<u16, Peer*>::iterator node = m_peers.find(peer_id);
1198 if(node == m_peers.end()){
1203 assert(node->second->id == peer_id);
1205 return node->second;
1208 std::list<Peer*> Connection::getPeers()
1210 std::list<Peer*> list;
1211 for(std::map<u16, Peer*>::iterator j = m_peers.begin();
1212 j != m_peers.end(); ++j)
1214 Peer *peer = j->second;
1215 list.push_back(peer);
1220 bool Connection::getFromBuffers(u16 &peer_id, SharedBuffer<u8> &dst)
1222 for(std::map<u16, Peer*>::iterator j = m_peers.begin();
1223 j != m_peers.end(); ++j)
1225 Peer *peer = j->second;
1226 for(u16 i=0; i<CHANNEL_COUNT; i++)
1228 Channel *channel = &peer->channels[i];
1229 SharedBuffer<u8> resultdata;
1230 bool got = checkIncomingBuffers(channel, peer_id, resultdata);
1240 bool Connection::checkIncomingBuffers(Channel *channel, u16 &peer_id,
1241 SharedBuffer<u8> &dst)
1243 u16 firstseqnum = 0;
1244 // Clear old packets from start of buffer
1247 firstseqnum = channel->incoming_reliables.getFirstSeqnum();
1248 if(seqnum_higher(channel->next_incoming_seqnum, firstseqnum))
1249 channel->incoming_reliables.popFirst();
1253 // This happens if all packets are old
1254 }catch(con::NotFoundException)
1257 if(channel->incoming_reliables.empty() == false)
1259 if(firstseqnum == channel->next_incoming_seqnum)
1261 BufferedPacket p = channel->incoming_reliables.popFirst();
1263 peer_id = readPeerId(*p.data);
1264 u8 channelnum = readChannel(*p.data);
1265 u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
1268 dout_con<<"UNBUFFERING TYPE_RELIABLE"
1269 <<" seqnum="<<seqnum
1270 <<" peer_id="<<peer_id
1271 <<" channel="<<((int)channelnum&0xff)
1274 channel->next_incoming_seqnum++;
1276 u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
1277 // Get out the inside packet and re-process it
1278 SharedBuffer<u8> payload(p.data.getSize() - headers_size);
1279 memcpy(*payload, &p.data[headers_size], payload.getSize());
1281 dst = processPacket(channel, payload, peer_id, channelnum, true);
1288 SharedBuffer<u8> Connection::processPacket(Channel *channel,
1289 SharedBuffer<u8> packetdata, u16 peer_id,
1290 u8 channelnum, bool reliable)
1292 IndentationRaiser iraiser(&(m_indentation));
1294 if(packetdata.getSize() < 1)
1295 throw InvalidIncomingDataException("packetdata.getSize() < 1");
1297 u8 type = readU8(&packetdata[0]);
1299 if(type == TYPE_CONTROL)
1301 if(packetdata.getSize() < 2)
1302 throw InvalidIncomingDataException("packetdata.getSize() < 2");
1304 u8 controltype = readU8(&packetdata[1]);
1306 if(controltype == CONTROLTYPE_ACK)
1308 if(packetdata.getSize() < 4)
1309 throw InvalidIncomingDataException
1310 ("packetdata.getSize() < 4 (ACK header size)");
1312 u16 seqnum = readU16(&packetdata[2]);
1314 dout_con<<"Got CONTROLTYPE_ACK: channelnum="
1315 <<((int)channelnum&0xff)<<", peer_id="<<peer_id
1316 <<", seqnum="<<seqnum<<std::endl;
1319 BufferedPacket p = channel->outgoing_reliables.popSeqnum(seqnum);
1320 // Get round trip time
1321 float rtt = p.totaltime;
1323 // Let peer calculate stuff according to it
1324 // (avg_rtt and resend_timeout)
1325 Peer *peer = getPeer(peer_id);
1326 peer->reportRTT(rtt);
1328 //PrintInfo(dout_con);
1329 //dout_con<<"RTT = "<<rtt<<std::endl;
1331 /*dout_con<<"OUTGOING: ";
1333 channel->outgoing_reliables.print();
1334 dout_con<<std::endl;*/
1336 catch(NotFoundException &e){
1337 PrintInfo(derr_con);
1338 derr_con<<"WARNING: ACKed packet not "
1343 throw ProcessedSilentlyException("Got an ACK");
1345 else if(controltype == CONTROLTYPE_SET_PEER_ID)
1347 if(packetdata.getSize() < 4)
1348 throw InvalidIncomingDataException
1349 ("packetdata.getSize() < 4 (SET_PEER_ID header size)");
1350 u16 peer_id_new = readU16(&packetdata[2]);
1352 dout_con<<"Got new peer id: "<<peer_id_new<<"... "<<std::endl;
1354 if(GetPeerID() != PEER_ID_INEXISTENT)
1356 PrintInfo(derr_con);
1357 derr_con<<"WARNING: Not changing"
1358 " existing peer id."<<std::endl;
1362 dout_con<<"changing."<<std::endl;
1363 SetPeerID(peer_id_new);
1365 throw ProcessedSilentlyException("Got a SET_PEER_ID");
1367 else if(controltype == CONTROLTYPE_PING)
1369 // Just ignore it, the incoming data already reset
1370 // the timeout counter
1372 dout_con<<"PING"<<std::endl;
1373 throw ProcessedSilentlyException("Got a PING");
1375 else if(controltype == CONTROLTYPE_DISCO)
1377 // Just ignore it, the incoming data already reset
1378 // the timeout counter
1380 dout_con<<"DISCO: Removing peer "<<(peer_id)<<std::endl;
1382 if(deletePeer(peer_id, false) == false)
1384 PrintInfo(derr_con);
1385 derr_con<<"DISCO: Peer not found"<<std::endl;
1388 throw ProcessedSilentlyException("Got a DISCO");
1391 PrintInfo(derr_con);
1392 derr_con<<"INVALID TYPE_CONTROL: invalid controltype="
1393 <<((int)controltype&0xff)<<std::endl;
1394 throw InvalidIncomingDataException("Invalid control type");
1397 else if(type == TYPE_ORIGINAL)
1399 if(packetdata.getSize() < ORIGINAL_HEADER_SIZE)
1400 throw InvalidIncomingDataException
1401 ("packetdata.getSize() < ORIGINAL_HEADER_SIZE");
1403 dout_con<<"RETURNING TYPE_ORIGINAL to user"
1405 // Get the inside packet out and return it
1406 SharedBuffer<u8> payload(packetdata.getSize() - ORIGINAL_HEADER_SIZE);
1407 memcpy(*payload, &packetdata[ORIGINAL_HEADER_SIZE], payload.getSize());
1410 else if(type == TYPE_SPLIT)
1412 // We have to create a packet again for buffering
1413 // This isn't actually too bad an idea.
1414 BufferedPacket packet = makePacket(
1415 getPeer(peer_id)->address,
1420 // Buffer the packet
1421 SharedBuffer<u8> data = channel->incoming_splits.insert(packet, reliable);
1422 if(data.getSize() != 0)
1425 dout_con<<"RETURNING TYPE_SPLIT: Constructed full data, "
1426 <<"size="<<data.getSize()<<std::endl;
1430 dout_con<<"BUFFERED TYPE_SPLIT"<<std::endl;
1431 throw ProcessedSilentlyException("Buffered a split packet chunk");
1433 else if(type == TYPE_RELIABLE)
1435 // Recursive reliable packets not allowed
1436 assert(reliable == false);
1438 if(packetdata.getSize() < RELIABLE_HEADER_SIZE)
1439 throw InvalidIncomingDataException
1440 ("packetdata.getSize() < RELIABLE_HEADER_SIZE");
1442 u16 seqnum = readU16(&packetdata[1]);
1444 bool is_future_packet = seqnum_higher(seqnum, channel->next_incoming_seqnum);
1445 bool is_old_packet = seqnum_higher(channel->next_incoming_seqnum, seqnum);
1448 if(is_future_packet)
1449 dout_con<<"BUFFERING";
1450 else if(is_old_packet)
1454 dout_con<<" TYPE_RELIABLE seqnum="<<seqnum
1455 <<" next="<<channel->next_incoming_seqnum;
1456 dout_con<<" [sending CONTROLTYPE_ACK"
1457 " to peer_id="<<peer_id<<"]";
1458 dout_con<<std::endl;
1461 //assert(channel->incoming_reliables.size() < 100);
1463 // Send a CONTROLTYPE_ACK
1464 SharedBuffer<u8> reply(4);
1465 writeU8(&reply[0], TYPE_CONTROL);
1466 writeU8(&reply[1], CONTROLTYPE_ACK);
1467 writeU16(&reply[2], seqnum);
1468 rawSendAsPacket(peer_id, channelnum, reply, false);
1470 //if(seqnum_higher(seqnum, channel->next_incoming_seqnum))
1471 if(is_future_packet)
1474 dout_con<<"Buffering reliable packet (seqnum="
1475 <<seqnum<<")"<<std::endl;*/
1477 // This one comes later, buffer it.
1478 // Actually we have to make a packet to buffer one.
1479 // Well, we have all the ingredients, so just do it.
1480 BufferedPacket packet = makePacket(
1481 getPeer(peer_id)->address,
1487 channel->incoming_reliables.insert(packet);
1490 dout_con<<"INCOMING: ";
1491 channel->incoming_reliables.print();
1492 dout_con<<std::endl;*/
1494 catch(AlreadyExistsException &e)
1498 throw ProcessedSilentlyException("Buffered future reliable packet");
1500 //else if(seqnum_higher(channel->next_incoming_seqnum, seqnum))
1501 else if(is_old_packet)
1503 // An old packet, dump it
1504 throw InvalidIncomingDataException("Got an old reliable packet");
1507 channel->next_incoming_seqnum++;
1509 // Get out the inside packet and re-process it
1510 SharedBuffer<u8> payload(packetdata.getSize() - RELIABLE_HEADER_SIZE);
1511 memcpy(*payload, &packetdata[RELIABLE_HEADER_SIZE], payload.getSize());
1513 return processPacket(channel, payload, peer_id, channelnum, true);
1517 PrintInfo(derr_con);
1518 derr_con<<"Got invalid type="<<((int)type&0xff)<<std::endl;
1519 throw InvalidIncomingDataException("Invalid packet type");
1522 // We should never get here.
1523 // If you get here, add an exception or a return to some of the
1524 // above conditionals.
1526 throw BaseException("Error in Channel::ProcessPacket()");
1529 bool Connection::deletePeer(u16 peer_id, bool timeout)
1531 if(m_peers.find(peer_id) == m_peers.end())
1534 Peer *peer = m_peers[peer_id];
1538 e.peerRemoved(peer_id, timeout, peer->address);
1541 delete m_peers[peer_id];
1542 m_peers.erase(peer_id);
1548 ConnectionEvent Connection::getEvent()
1550 if(m_event_queue.empty()){
1552 e.type = CONNEVENT_NONE;
1555 return m_event_queue.pop_front();
1558 ConnectionEvent Connection::waitEvent(u32 timeout_ms)
1561 return m_event_queue.pop_front(timeout_ms);
1562 } catch(ItemNotFoundException &ex){
1564 e.type = CONNEVENT_NONE;
1569 void Connection::putCommand(ConnectionCommand &c)
1571 m_command_queue.push_back(c);
1574 void Connection::Serve(unsigned short port)
1576 ConnectionCommand c;
1581 void Connection::Connect(Address address)
1583 ConnectionCommand c;
1588 bool Connection::Connected()
1590 JMutexAutoLock peerlock(m_peers_mutex);
1592 if(m_peers.size() != 1)
1595 std::map<u16, Peer*>::iterator node = m_peers.find(PEER_ID_SERVER);
1596 if(node == m_peers.end())
1599 if(m_peer_id == PEER_ID_INEXISTENT)
1605 void Connection::Disconnect()
1607 ConnectionCommand c;
1612 u32 Connection::Receive(u16 &peer_id, SharedBuffer<u8> &data)
1615 ConnectionEvent e = waitEvent(m_bc_receive_timeout);
1616 if(e.type != CONNEVENT_NONE)
1617 dout_con<<getDesc()<<": Receive: got event: "
1618 <<e.describe()<<std::endl;
1620 case CONNEVENT_NONE:
1621 throw NoIncomingDataException("No incoming data");
1622 case CONNEVENT_DATA_RECEIVED:
1623 peer_id = e.peer_id;
1624 data = SharedBuffer<u8>(e.data);
1625 return e.data.getSize();
1626 case CONNEVENT_PEER_ADDED: {
1627 Peer tmp(e.peer_id, e.address);
1628 if(m_bc_peerhandler)
1629 m_bc_peerhandler->peerAdded(&tmp);
1631 case CONNEVENT_PEER_REMOVED: {
1632 Peer tmp(e.peer_id, e.address);
1633 if(m_bc_peerhandler)
1634 m_bc_peerhandler->deletingPeer(&tmp, e.timeout);
1636 case CONNEVENT_BIND_FAILED:
1637 throw ConnectionBindFailed("Failed to bind socket "
1638 "(port already in use?)");
1641 throw NoIncomingDataException("No incoming data");
1644 void Connection::SendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable)
1646 assert(channelnum < CHANNEL_COUNT);
1648 ConnectionCommand c;
1649 c.sendToAll(channelnum, data, reliable);
1653 void Connection::Send(u16 peer_id, u8 channelnum,
1654 SharedBuffer<u8> data, bool reliable)
1656 assert(channelnum < CHANNEL_COUNT);
1658 ConnectionCommand c;
1659 c.send(peer_id, channelnum, data, reliable);
1663 void Connection::RunTimeouts(float dtime)
1668 Address Connection::GetPeerAddress(u16 peer_id)
1670 JMutexAutoLock peerlock(m_peers_mutex);
1671 return getPeer(peer_id)->address;
1674 float Connection::GetPeerAvgRTT(u16 peer_id)
1676 JMutexAutoLock peerlock(m_peers_mutex);
1677 return getPeer(peer_id)->avg_rtt;
1680 void Connection::DeletePeer(u16 peer_id)
1682 ConnectionCommand c;
1683 c.deletePeer(peer_id);
1687 void Connection::PrintInfo(std::ostream &out)
1689 out<<getDesc()<<": ";
1692 void Connection::PrintInfo()
1694 PrintInfo(dout_con);
1697 std::string Connection::getDesc()
1699 return std::string("con(")+itos(m_socket.GetHandle())+"/"+itos(m_peer_id)+")";