3 Copyright (C) 2013 celeron55, Perttu Ahola <celeron55@gmail.com>
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU Lesser General Public License as published by
7 the Free Software Foundation; either version 2.1 of the License, or
8 (at your option) any later version.
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU Lesser General Public License for more details.
15 You should have received a copy of the GNU Lesser General Public License along
16 with this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 #include "connection.h"
22 #include "serialization.h"
25 #include "util/serialize.h"
26 #include "util/numeric.h"
27 #include "util/string.h"
33 static u16 readPeerId(u8 *packetdata)
35 return readU16(&packetdata[4]);
37 static u8 readChannel(u8 *packetdata)
39 return readU8(&packetdata[6]);
42 BufferedPacket makePacket(Address &address, u8 *data, u32 datasize,
43 u32 protocol_id, u16 sender_peer_id, u8 channel)
45 u32 packet_size = datasize + BASE_HEADER_SIZE;
46 BufferedPacket p(packet_size);
49 writeU32(&p.data[0], protocol_id);
50 writeU16(&p.data[4], sender_peer_id);
51 writeU8(&p.data[6], channel);
53 memcpy(&p.data[BASE_HEADER_SIZE], data, datasize);
58 BufferedPacket makePacket(Address &address, SharedBuffer<u8> &data,
59 u32 protocol_id, u16 sender_peer_id, u8 channel)
61 return makePacket(address, *data, data.getSize(),
62 protocol_id, sender_peer_id, channel);
65 SharedBuffer<u8> makeOriginalPacket(
66 SharedBuffer<u8> data)
69 u32 packet_size = data.getSize() + header_size;
70 SharedBuffer<u8> b(packet_size);
72 writeU8(&b[0], TYPE_ORIGINAL);
74 memcpy(&b[header_size], *data, data.getSize());
79 std::list<SharedBuffer<u8> > makeSplitPacket(
80 SharedBuffer<u8> data,
84 // Chunk packets, containing the TYPE_SPLIT header
85 std::list<SharedBuffer<u8> > chunks;
87 u32 chunk_header_size = 7;
88 u32 maximum_data_size = chunksize_max - chunk_header_size;
94 end = start + maximum_data_size - 1;
95 if(end > data.getSize() - 1)
96 end = data.getSize() - 1;
98 u32 payload_size = end - start + 1;
99 u32 packet_size = chunk_header_size + payload_size;
101 SharedBuffer<u8> chunk(packet_size);
103 writeU8(&chunk[0], TYPE_SPLIT);
104 writeU16(&chunk[1], seqnum);
105 // [3] u16 chunk_count is written at next stage
106 writeU16(&chunk[5], chunk_num);
107 memcpy(&chunk[chunk_header_size], &data[start], payload_size);
109 chunks.push_back(chunk);
115 while(end != data.getSize() - 1);
117 for(std::list<SharedBuffer<u8> >::iterator i = chunks.begin();
118 i != chunks.end(); ++i)
121 writeU16(&((*i)[3]), chunk_count);
127 std::list<SharedBuffer<u8> > makeAutoSplitPacket(
128 SharedBuffer<u8> data,
132 u32 original_header_size = 1;
133 std::list<SharedBuffer<u8> > list;
134 if(data.getSize() + original_header_size > chunksize_max)
136 list = makeSplitPacket(data, chunksize_max, split_seqnum);
142 list.push_back(makeOriginalPacket(data));
147 SharedBuffer<u8> makeReliablePacket(
148 SharedBuffer<u8> data,
151 /*dstream<<"BEGIN SharedBuffer<u8> makeReliablePacket()"<<std::endl;
152 dstream<<"data.getSize()="<<data.getSize()<<", data[0]="
153 <<((unsigned int)data[0]&0xff)<<std::endl;*/
155 u32 packet_size = data.getSize() + header_size;
156 SharedBuffer<u8> b(packet_size);
158 writeU8(&b[0], TYPE_RELIABLE);
159 writeU16(&b[1], seqnum);
161 memcpy(&b[header_size], *data, data.getSize());
163 /*dstream<<"data.getSize()="<<data.getSize()<<", data[0]="
164 <<((unsigned int)data[0]&0xff)<<std::endl;*/
165 //dstream<<"END SharedBuffer<u8> makeReliablePacket()"<<std::endl;
173 ReliablePacketBuffer::ReliablePacketBuffer(): m_list_size(0) {}
175 void ReliablePacketBuffer::print()
177 for(std::list<BufferedPacket>::iterator i = m_list.begin();
181 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
185 bool ReliablePacketBuffer::empty()
187 return m_list.empty();
189 u32 ReliablePacketBuffer::size()
193 RPBSearchResult ReliablePacketBuffer::findPacket(u16 seqnum)
195 std::list<BufferedPacket>::iterator i = m_list.begin();
196 for(; i != m_list.end(); ++i)
198 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
199 /*dout_con<<"findPacket(): finding seqnum="<<seqnum
200 <<", comparing to s="<<s<<std::endl;*/
206 RPBSearchResult ReliablePacketBuffer::notFound()
210 bool ReliablePacketBuffer::getFirstSeqnum(u16 *result)
214 BufferedPacket p = *m_list.begin();
215 *result = readU16(&p.data[BASE_HEADER_SIZE+1]);
218 BufferedPacket ReliablePacketBuffer::popFirst()
221 throw NotFoundException("Buffer is empty");
222 BufferedPacket p = *m_list.begin();
223 m_list.erase(m_list.begin());
227 BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum)
229 RPBSearchResult r = findPacket(seqnum);
231 dout_con<<"Not found"<<std::endl;
232 throw NotFoundException("seqnum not found in buffer");
234 BufferedPacket p = *r;
239 void ReliablePacketBuffer::insert(BufferedPacket &p)
241 assert(p.data.getSize() >= BASE_HEADER_SIZE+3);
242 u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
243 assert(type == TYPE_RELIABLE);
244 u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
247 // Find the right place for the packet and insert it there
249 // If list is empty, just add it
256 // Otherwise find the right place
257 std::list<BufferedPacket>::iterator i = m_list.begin();
258 // Find the first packet in the list which has a higher seqnum
259 for(; i != m_list.end(); ++i){
260 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
263 throw AlreadyExistsException("Same seqnum in list");
265 if(seqnum_higher(s, seqnum)){
269 // If we're at the end of the list, add the packet to the
271 if(i == m_list.end())
281 void ReliablePacketBuffer::incrementTimeouts(float dtime)
283 for(std::list<BufferedPacket>::iterator i = m_list.begin();
284 i != m_list.end(); ++i)
287 i->totaltime += dtime;
291 void ReliablePacketBuffer::resetTimedOuts(float timeout)
293 for(std::list<BufferedPacket>::iterator i = m_list.begin();
294 i != m_list.end(); ++i)
296 if(i->time >= timeout)
301 bool ReliablePacketBuffer::anyTotaltimeReached(float timeout)
303 for(std::list<BufferedPacket>::iterator i = m_list.begin();
304 i != m_list.end(); ++i)
306 if(i->totaltime >= timeout)
312 std::list<BufferedPacket> ReliablePacketBuffer::getTimedOuts(float timeout)
314 std::list<BufferedPacket> timed_outs;
315 for(std::list<BufferedPacket>::iterator i = m_list.begin();
316 i != m_list.end(); ++i)
318 if(i->time >= timeout)
319 timed_outs.push_back(*i);
328 IncomingSplitBuffer::~IncomingSplitBuffer()
330 for(std::map<u16, IncomingSplitPacket*>::iterator i = m_buf.begin();
331 i != m_buf.end(); ++i)
337 This will throw a GotSplitPacketException when a full
338 split packet is constructed.
340 SharedBuffer<u8> IncomingSplitBuffer::insert(BufferedPacket &p, bool reliable)
342 u32 headersize = BASE_HEADER_SIZE + 7;
343 assert(p.data.getSize() >= headersize);
344 u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
345 assert(type == TYPE_SPLIT);
346 u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
347 u16 chunk_count = readU16(&p.data[BASE_HEADER_SIZE+3]);
348 u16 chunk_num = readU16(&p.data[BASE_HEADER_SIZE+5]);
350 // Add if doesn't exist
351 if(m_buf.find(seqnum) == m_buf.end())
353 IncomingSplitPacket *sp = new IncomingSplitPacket();
354 sp->chunk_count = chunk_count;
355 sp->reliable = reliable;
359 IncomingSplitPacket *sp = m_buf[seqnum];
361 // TODO: These errors should be thrown or something? Dunno.
362 if(chunk_count != sp->chunk_count)
363 derr_con<<"Connection: WARNING: chunk_count="<<chunk_count
364 <<" != sp->chunk_count="<<sp->chunk_count
366 if(reliable != sp->reliable)
367 derr_con<<"Connection: WARNING: reliable="<<reliable
368 <<" != sp->reliable="<<sp->reliable
371 // If chunk already exists, ignore it.
372 // Sometimes two identical packets may arrive when there is network
373 // lag and the server re-sends stuff.
374 if(sp->chunks.find(chunk_num) != sp->chunks.end())
375 return SharedBuffer<u8>();
377 // Cut chunk data out of packet
378 u32 chunkdatasize = p.data.getSize() - headersize;
379 SharedBuffer<u8> chunkdata(chunkdatasize);
380 memcpy(*chunkdata, &(p.data[headersize]), chunkdatasize);
382 // Set chunk data in buffer
383 sp->chunks[chunk_num] = chunkdata;
385 // If not all chunks are received, return empty buffer
386 if(sp->allReceived() == false)
387 return SharedBuffer<u8>();
389 // Calculate total size
391 for(std::map<u16, SharedBuffer<u8> >::iterator i = sp->chunks.begin();
392 i != sp->chunks.end(); ++i)
394 totalsize += i->second.getSize();
397 SharedBuffer<u8> fulldata(totalsize);
399 // Copy chunks to data buffer
401 for(u32 chunk_i=0; chunk_i<sp->chunk_count;
404 SharedBuffer<u8> buf = sp->chunks[chunk_i];
405 u16 chunkdatasize = buf.getSize();
406 memcpy(&fulldata[start], *buf, chunkdatasize);
407 start += chunkdatasize;;
410 // Remove sp from buffer
416 void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout)
418 std::list<u16> remove_queue;
419 for(std::map<u16, IncomingSplitPacket*>::iterator i = m_buf.begin();
420 i != m_buf.end(); ++i)
422 IncomingSplitPacket *p = i->second;
423 // Reliable ones are not removed by timeout
424 if(p->reliable == true)
427 if(p->time >= timeout)
428 remove_queue.push_back(i->first);
430 for(std::list<u16>::iterator j = remove_queue.begin();
431 j != remove_queue.end(); ++j)
433 dout_con<<"NOTE: Removing timed out unreliable split packet"
446 next_outgoing_seqnum = SEQNUM_INITIAL;
447 next_incoming_seqnum = SEQNUM_INITIAL;
448 next_outgoing_split_seqnum = SEQNUM_INITIAL;
458 Peer::Peer(u16 a_id, Address a_address):
461 timeout_counter(0.0),
465 has_sent_with_id(false),
467 m_max_packets_per_second(10),
470 congestion_control_aim_rtt(0.2),
471 congestion_control_max_rate(400),
472 congestion_control_min_rate(10)
479 void Peer::reportRTT(float rtt)
483 if(m_max_packets_per_second < congestion_control_max_rate)
484 m_max_packets_per_second += 10;
485 } else if(rtt < congestion_control_aim_rtt){
486 if(m_max_packets_per_second < congestion_control_max_rate)
487 m_max_packets_per_second += 2;
489 m_max_packets_per_second *= 0.8;
490 if(m_max_packets_per_second < congestion_control_min_rate)
491 m_max_packets_per_second = congestion_control_min_rate;
497 else if(avg_rtt < 0.0)
500 avg_rtt = rtt * 0.1 + avg_rtt * 0.9;
502 // Calculate resend_timeout
504 /*int reliable_count = 0;
505 for(int i=0; i<CHANNEL_COUNT; i++)
507 reliable_count += channels[i].outgoing_reliables.size();
509 float timeout = avg_rtt * RESEND_TIMEOUT_FACTOR
510 * ((float)reliable_count * 1);*/
512 float timeout = avg_rtt * RESEND_TIMEOUT_FACTOR;
513 if(timeout < RESEND_TIMEOUT_MIN)
514 timeout = RESEND_TIMEOUT_MIN;
515 if(timeout > RESEND_TIMEOUT_MAX)
516 timeout = RESEND_TIMEOUT_MAX;
517 resend_timeout = timeout;
524 Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout,
526 m_protocol_id(protocol_id),
527 m_max_packet_size(max_packet_size),
531 m_bc_peerhandler(NULL),
532 m_bc_receive_timeout(0),
535 m_socket.setTimeoutMs(5);
540 Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout,
541 bool ipv6, PeerHandler *peerhandler):
542 m_protocol_id(protocol_id),
543 m_max_packet_size(max_packet_size),
547 m_bc_peerhandler(peerhandler),
548 m_bc_receive_timeout(0),
551 m_socket.setTimeoutMs(5);
557 Connection::~Connection()
561 for(std::map<u16, Peer*>::iterator
563 j != m_peers.end(); ++j)
571 void * Connection::Thread()
574 log_register_thread("Connection");
576 dout_con<<"Connection thread started"<<std::endl;
578 u32 curtime = porting::getTimeMs();
579 u32 lasttime = curtime;
581 while(!StopRequested())
583 BEGIN_DEBUG_EXCEPTION_HANDLER
586 curtime = porting::getTimeMs();
587 float dtime = (float)(curtime - lasttime) / 1000.;
595 //NOTE this is only thread safe for ONE consumer thread!
596 while(!m_command_queue.empty()){
597 ConnectionCommand c = m_command_queue.pop_frontNoEx();
605 END_DEBUG_EXCEPTION_HANDLER(derr_con);
611 void Connection::putEvent(ConnectionEvent &e)
613 assert(e.type != CONNEVENT_NONE);
614 m_event_queue.push_back(e);
617 void Connection::processCommand(ConnectionCommand &c)
621 dout_con<<getDesc()<<" processing CONNCMD_NONE"<<std::endl;
624 dout_con<<getDesc()<<" processing CONNCMD_SERVE port="
628 case CONNCMD_CONNECT:
629 dout_con<<getDesc()<<" processing CONNCMD_CONNECT"<<std::endl;
632 case CONNCMD_DISCONNECT:
633 dout_con<<getDesc()<<" processing CONNCMD_DISCONNECT"<<std::endl;
637 dout_con<<getDesc()<<" processing CONNCMD_SEND"<<std::endl;
638 send(c.peer_id, c.channelnum, c.data, c.reliable);
640 case CONNCMD_SEND_TO_ALL:
641 dout_con<<getDesc()<<" processing CONNCMD_SEND_TO_ALL"<<std::endl;
642 sendToAll(c.channelnum, c.data, c.reliable);
644 case CONNCMD_DELETE_PEER:
645 dout_con<<getDesc()<<" processing CONNCMD_DELETE_PEER"<<std::endl;
646 deletePeer(c.peer_id, false);
651 void Connection::send(float dtime)
653 for(std::map<u16, Peer*>::iterator
655 j != m_peers.end(); ++j)
657 Peer *peer = j->second;
658 peer->m_sendtime_accu += dtime;
659 peer->m_num_sent = 0;
660 peer->m_max_num_sent = peer->m_sendtime_accu *
661 peer->m_max_packets_per_second;
663 Queue<OutgoingPacket> postponed_packets;
664 while(!m_outgoing_queue.empty()){
665 OutgoingPacket packet = m_outgoing_queue.pop_front();
666 Peer *peer = getPeerNoEx(packet.peer_id);
669 if(peer->channels[packet.channelnum].outgoing_reliables.size() >= 5){
670 postponed_packets.push_back(packet);
671 } else if(peer->m_num_sent < peer->m_max_num_sent){
672 rawSendAsPacket(packet.peer_id, packet.channelnum,
673 packet.data, packet.reliable);
676 postponed_packets.push_back(packet);
679 while(!postponed_packets.empty()){
680 m_outgoing_queue.push_back(postponed_packets.pop_front());
682 for(std::map<u16, Peer*>::iterator
684 j != m_peers.end(); ++j)
686 Peer *peer = j->second;
687 peer->m_sendtime_accu -= (float)peer->m_num_sent /
688 peer->m_max_packets_per_second;
689 if(peer->m_sendtime_accu > 10. / peer->m_max_packets_per_second)
690 peer->m_sendtime_accu = 10. / peer->m_max_packets_per_second;
694 // Receive packets from the network and buffers and create ConnectionEvents
695 void Connection::receive()
697 u32 datasize = m_max_packet_size * 2; // Double it just to be safe
698 // TODO: We can not know how many layers of header there are.
699 // For now, just assume there are no other than the base headers.
700 u32 packet_maxsize = datasize + BASE_HEADER_SIZE;
701 SharedBuffer<u8> packetdata(packet_maxsize);
703 bool single_wait_done = false;
705 for(u32 loop_i=0; loop_i<1000; loop_i++) // Limit in case of DoS
708 /* Check if some buffer has relevant data */
711 SharedBuffer<u8> resultdata;
712 bool got = getFromBuffers(peer_id, resultdata);
715 e.dataReceived(peer_id, resultdata);
721 if(single_wait_done){
722 if(m_socket.WaitData(0) == false)
726 single_wait_done = true;
729 s32 received_size = m_socket.Receive(sender, *packetdata, packet_maxsize);
731 if(received_size < 0)
733 if(received_size < BASE_HEADER_SIZE)
735 if(readU32(&packetdata[0]) != m_protocol_id)
738 u16 peer_id = readPeerId(*packetdata);
739 u8 channelnum = readChannel(*packetdata);
740 if(channelnum > CHANNEL_COUNT-1){
742 derr_con<<"Receive(): Invalid channel "<<channelnum<<std::endl;
743 throw InvalidIncomingDataException("Channel doesn't exist");
746 if(peer_id == PEER_ID_INEXISTENT)
749 Somebody is trying to send stuff to us with no peer id.
751 Check if the same address and port was added to our peer
753 Allow only entries that have has_sent_with_id==false.
756 std::map<u16, Peer*>::iterator j;
758 for(; j != m_peers.end(); ++j)
760 Peer *peer = j->second;
761 if(peer->has_sent_with_id)
763 if(peer->address == sender)
768 If no peer was found with the same address and port,
769 we shall assume it is a new peer and create an entry.
771 if(j == m_peers.end())
773 // Pass on to adding the peer
775 // Else: A peer was found.
778 Peer *peer = j->second;
781 derr_con<<"WARNING: Assuming unknown peer to be "
782 <<"peer_id="<<peer_id<<std::endl;
787 The peer was not found in our lists. Add it.
789 if(peer_id == PEER_ID_INEXISTENT)
791 // Somebody wants to make a new connection
793 // Get a unique peer id (2 or higher)
796 Find an unused peer id
798 bool out_of_ids = false;
802 if(m_peers.find(peer_id_new) == m_peers.end())
804 // Check for overflow
805 if(peer_id_new == 65535){
812 errorstream<<getDesc()<<" ran out of peer ids"<<std::endl;
817 dout_con<<"Receive(): Got a packet with peer_id=PEER_ID_INEXISTENT,"
818 " giving peer_id="<<peer_id_new<<std::endl;
821 Peer *peer = new Peer(peer_id_new, sender);
822 m_peers[peer->id] = peer;
824 // Create peer addition event
826 e.peerAdded(peer_id_new, sender);
829 // Create CONTROL packet to tell the peer id to the new peer.
830 SharedBuffer<u8> reply(4);
831 writeU8(&reply[0], TYPE_CONTROL);
832 writeU8(&reply[1], CONTROLTYPE_SET_PEER_ID);
833 writeU16(&reply[2], peer_id_new);
834 sendAsPacket(peer_id_new, 0, reply, true);
836 // We're now talking to a valid peer_id
837 peer_id = peer_id_new;
839 // Go on and process whatever it sent
842 std::map<u16, Peer*>::iterator node = m_peers.find(peer_id);
844 if(node == m_peers.end())
847 // This means that the peer id of the sender is not PEER_ID_INEXISTENT
848 // and it is invalid.
850 derr_con<<"Receive(): Peer not found"<<std::endl;
851 throw InvalidIncomingDataException("Peer not found (possible timeout)");
854 Peer *peer = node->second;
856 // Validate peer address
857 if(peer->address != sender)
860 derr_con<<"Peer "<<peer_id<<" sending from different address."
861 " Ignoring."<<std::endl;
865 peer->timeout_counter = 0.0;
867 Channel *channel = &(peer->channels[channelnum]);
869 // Throw the received packet to channel->processPacket()
871 // Make a new SharedBuffer from the data without the base headers
872 SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE);
873 memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE],
874 strippeddata.getSize());
877 // Process it (the result is some data with no headers made by us)
878 SharedBuffer<u8> resultdata = processPacket
879 (channel, strippeddata, peer_id, channelnum, false);
882 dout_con<<"ProcessPacket returned data of size "
883 <<resultdata.getSize()<<std::endl;
886 e.dataReceived(peer_id, resultdata);
889 }catch(ProcessedSilentlyException &e){
891 }catch(InvalidIncomingDataException &e){
893 catch(ProcessedSilentlyException &e){
898 void Connection::runTimeouts(float dtime)
900 float congestion_control_aim_rtt
901 = g_settings->getFloat("congestion_control_aim_rtt");
902 float congestion_control_max_rate
903 = g_settings->getFloat("congestion_control_max_rate");
904 float congestion_control_min_rate
905 = g_settings->getFloat("congestion_control_min_rate");
907 std::list<u16> timeouted_peers;
908 for(std::map<u16, Peer*>::iterator j = m_peers.begin();
909 j != m_peers.end(); ++j)
911 Peer *peer = j->second;
913 // Update congestion control values
914 peer->congestion_control_aim_rtt = congestion_control_aim_rtt;
915 peer->congestion_control_max_rate = congestion_control_max_rate;
916 peer->congestion_control_min_rate = congestion_control_min_rate;
921 peer->timeout_counter += dtime;
922 if(peer->timeout_counter > m_timeout)
925 derr_con<<"RunTimeouts(): Peer "<<peer->id
927 <<" (source=peer->timeout_counter)"
929 // Add peer to the list
930 timeouted_peers.push_back(peer->id);
931 // Don't bother going through the buffers of this one
935 float resend_timeout = peer->resend_timeout;
936 for(u16 i=0; i<CHANNEL_COUNT; i++)
938 std::list<BufferedPacket> timed_outs;
940 Channel *channel = &peer->channels[i];
942 // Remove timed out incomplete unreliable split packets
943 channel->incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout);
945 // Increment reliable packet times
946 channel->outgoing_reliables.incrementTimeouts(dtime);
948 // Check reliable packet total times, remove peer if
950 if(channel->outgoing_reliables.anyTotaltimeReached(m_timeout))
953 derr_con<<"RunTimeouts(): Peer "<<peer->id
955 <<" (source=reliable packet totaltime)"
957 // Add peer to the to-be-removed list
958 timeouted_peers.push_back(peer->id);
962 // Re-send timed out outgoing reliables
964 timed_outs = channel->
965 outgoing_reliables.getTimedOuts(resend_timeout);
967 channel->outgoing_reliables.resetTimedOuts(resend_timeout);
969 for(std::list<BufferedPacket>::iterator j = timed_outs.begin();
970 j != timed_outs.end(); ++j)
972 u16 peer_id = readPeerId(*(j->data));
973 u8 channel = readChannel(*(j->data));
974 u16 seqnum = readU16(&(j->data[BASE_HEADER_SIZE+1]));
977 derr_con<<"RE-SENDING timed-out RELIABLE to ";
978 j->address.print(&derr_con);
979 derr_con<<"(t/o="<<resend_timeout<<"): "
980 <<"from_peer_id="<<peer_id
981 <<", channel="<<((int)channel&0xff)
982 <<", seqnum="<<seqnum
987 // Enlarge avg_rtt and resend_timeout:
988 // The rtt will be at least the timeout.
989 // NOTE: This won't affect the timeout of the next
990 // checked channel because it was cached.
991 peer->reportRTT(resend_timeout);
998 peer->ping_timer += dtime;
999 if(peer->ping_timer >= 5.0)
1001 // Create and send PING packet
1002 SharedBuffer<u8> data(2);
1003 writeU8(&data[0], TYPE_CONTROL);
1004 writeU8(&data[1], CONTROLTYPE_PING);
1005 rawSendAsPacket(peer->id, 0, data, true);
1007 peer->ping_timer = 0.0;
1014 // Remove timed out peers
1015 for(std::list<u16>::iterator i = timeouted_peers.begin();
1016 i != timeouted_peers.end(); ++i)
1018 PrintInfo(derr_con);
1019 derr_con<<"RunTimeouts(): Removing peer "<<(*i)<<std::endl;
1020 deletePeer(*i, true);
1024 void Connection::serve(u16 port)
1026 dout_con<<getDesc()<<" serving at port "<<port<<std::endl;
1028 m_socket.Bind(port);
1029 m_peer_id = PEER_ID_SERVER;
1031 catch(SocketException &e){
1039 void Connection::connect(Address address)
1041 dout_con<<getDesc()<<" connecting to "<<address.serializeString()
1042 <<":"<<address.getPort()<<std::endl;
1044 std::map<u16, Peer*>::iterator node = m_peers.find(PEER_ID_SERVER);
1045 if(node != m_peers.end()){
1046 throw ConnectionException("Already connected to a server");
1049 Peer *peer = new Peer(PEER_ID_SERVER, address);
1050 m_peers[peer->id] = peer;
1054 e.peerAdded(peer->id, peer->address);
1059 // Send a dummy packet to server with peer_id = PEER_ID_INEXISTENT
1060 m_peer_id = PEER_ID_INEXISTENT;
1061 SharedBuffer<u8> data(0);
1062 Send(PEER_ID_SERVER, 0, data, true);
1065 void Connection::disconnect()
1067 dout_con<<getDesc()<<" disconnecting"<<std::endl;
1069 // Create and send DISCO packet
1070 SharedBuffer<u8> data(2);
1071 writeU8(&data[0], TYPE_CONTROL);
1072 writeU8(&data[1], CONTROLTYPE_DISCO);
1075 for(std::map<u16, Peer*>::iterator j = m_peers.begin();
1076 j != m_peers.end(); ++j)
1078 Peer *peer = j->second;
1079 rawSendAsPacket(peer->id, 0, data, false);
1083 void Connection::sendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable)
1085 for(std::map<u16, Peer*>::iterator j = m_peers.begin();
1086 j != m_peers.end(); ++j)
1088 Peer *peer = j->second;
1089 send(peer->id, channelnum, data, reliable);
1093 void Connection::send(u16 peer_id, u8 channelnum,
1094 SharedBuffer<u8> data, bool reliable)
1096 dout_con<<getDesc()<<" sending to peer_id="<<peer_id<<std::endl;
1098 assert(channelnum < CHANNEL_COUNT);
1100 Peer *peer = getPeerNoEx(peer_id);
1103 Channel *channel = &(peer->channels[channelnum]);
1105 u32 chunksize_max = m_max_packet_size - BASE_HEADER_SIZE;
1107 chunksize_max -= RELIABLE_HEADER_SIZE;
1109 std::list<SharedBuffer<u8> > originals;
1110 originals = makeAutoSplitPacket(data, chunksize_max,
1111 channel->next_outgoing_split_seqnum);
1113 for(std::list<SharedBuffer<u8> >::iterator i = originals.begin();
1114 i != originals.end(); ++i)
1116 SharedBuffer<u8> original = *i;
1118 sendAsPacket(peer_id, channelnum, original, reliable);
1122 void Connection::sendAsPacket(u16 peer_id, u8 channelnum,
1123 SharedBuffer<u8> data, bool reliable)
1125 OutgoingPacket packet(peer_id, channelnum, data, reliable);
1126 m_outgoing_queue.push_back(packet);
1129 void Connection::rawSendAsPacket(u16 peer_id, u8 channelnum,
1130 SharedBuffer<u8> data, bool reliable)
1132 Peer *peer = getPeerNoEx(peer_id);
1135 Channel *channel = &(peer->channels[channelnum]);
1139 u16 seqnum = channel->next_outgoing_seqnum;
1140 channel->next_outgoing_seqnum++;
1142 SharedBuffer<u8> reliable = makeReliablePacket(data, seqnum);
1144 // Add base headers and make a packet
1145 BufferedPacket p = makePacket(peer->address, reliable,
1146 m_protocol_id, m_peer_id, channelnum);
1149 // Buffer the packet
1150 channel->outgoing_reliables.insert(p);
1152 catch(AlreadyExistsException &e)
1154 PrintInfo(derr_con);
1155 derr_con<<"WARNING: Going to send a reliable packet "
1156 "seqnum="<<seqnum<<" that is already "
1157 "in outgoing buffer"<<std::endl;
1166 // Add base headers and make a packet
1167 BufferedPacket p = makePacket(peer->address, data,
1168 m_protocol_id, m_peer_id, channelnum);
1175 void Connection::rawSend(const BufferedPacket &packet)
1178 m_socket.Send(packet.address, *packet.data, packet.data.getSize());
1179 } catch(SendFailedException &e){
1180 derr_con<<"Connection::rawSend(): SendFailedException: "
1181 <<packet.address.serializeString()<<std::endl;
1185 Peer* Connection::getPeer(u16 peer_id)
1187 std::map<u16, Peer*>::iterator node = m_peers.find(peer_id);
1189 if(node == m_peers.end()){
1190 throw PeerNotFoundException("GetPeer: Peer not found (possible timeout)");
1194 assert(node->second->id == peer_id);
1196 return node->second;
1199 Peer* Connection::getPeerNoEx(u16 peer_id)
1201 std::map<u16, Peer*>::iterator node = m_peers.find(peer_id);
1203 if(node == m_peers.end()){
1208 assert(node->second->id == peer_id);
1210 return node->second;
1213 std::list<Peer*> Connection::getPeers()
1215 std::list<Peer*> list;
1216 for(std::map<u16, Peer*>::iterator j = m_peers.begin();
1217 j != m_peers.end(); ++j)
1219 Peer *peer = j->second;
1220 list.push_back(peer);
1225 bool Connection::getFromBuffers(u16 &peer_id, SharedBuffer<u8> &dst)
1227 for(std::map<u16, Peer*>::iterator j = m_peers.begin();
1228 j != m_peers.end(); ++j)
1230 Peer *peer = j->second;
1231 for(u16 i=0; i<CHANNEL_COUNT; i++)
1233 Channel *channel = &peer->channels[i];
1234 SharedBuffer<u8> resultdata;
1235 bool got = checkIncomingBuffers(channel, peer_id, resultdata);
1245 bool Connection::checkIncomingBuffers(Channel *channel, u16 &peer_id,
1246 SharedBuffer<u8> &dst)
1248 u16 firstseqnum = 0;
1249 // Clear old packets from start of buffer
1251 bool found = channel->incoming_reliables.getFirstSeqnum(&firstseqnum);
1254 if(seqnum_higher(channel->next_incoming_seqnum, firstseqnum))
1255 channel->incoming_reliables.popFirst();
1259 // This happens if all packets are old
1261 if(channel->incoming_reliables.empty() == false)
1263 if(firstseqnum == channel->next_incoming_seqnum)
1265 BufferedPacket p = channel->incoming_reliables.popFirst();
1267 peer_id = readPeerId(*p.data);
1268 u8 channelnum = readChannel(*p.data);
1269 u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
1272 dout_con<<"UNBUFFERING TYPE_RELIABLE"
1273 <<" seqnum="<<seqnum
1274 <<" peer_id="<<peer_id
1275 <<" channel="<<((int)channelnum&0xff)
1278 channel->next_incoming_seqnum++;
1280 u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
1281 // Get out the inside packet and re-process it
1282 SharedBuffer<u8> payload(p.data.getSize() - headers_size);
1283 memcpy(*payload, &p.data[headers_size], payload.getSize());
1285 dst = processPacket(channel, payload, peer_id, channelnum, true);
1292 SharedBuffer<u8> Connection::processPacket(Channel *channel,
1293 SharedBuffer<u8> packetdata, u16 peer_id,
1294 u8 channelnum, bool reliable)
1296 IndentationRaiser iraiser(&(m_indentation));
1298 if(packetdata.getSize() < 1)
1299 throw InvalidIncomingDataException("packetdata.getSize() < 1");
1301 u8 type = readU8(&packetdata[0]);
1303 if(type == TYPE_CONTROL)
1305 if(packetdata.getSize() < 2)
1306 throw InvalidIncomingDataException("packetdata.getSize() < 2");
1308 u8 controltype = readU8(&packetdata[1]);
1310 if(controltype == CONTROLTYPE_ACK)
1312 if(packetdata.getSize() < 4)
1313 throw InvalidIncomingDataException
1314 ("packetdata.getSize() < 4 (ACK header size)");
1316 u16 seqnum = readU16(&packetdata[2]);
1318 dout_con<<"Got CONTROLTYPE_ACK: channelnum="
1319 <<((int)channelnum&0xff)<<", peer_id="<<peer_id
1320 <<", seqnum="<<seqnum<<std::endl;
1323 BufferedPacket p = channel->outgoing_reliables.popSeqnum(seqnum);
1324 // Get round trip time
1325 float rtt = p.totaltime;
1327 // Let peer calculate stuff according to it
1328 // (avg_rtt and resend_timeout)
1329 Peer *peer = getPeer(peer_id);
1330 peer->reportRTT(rtt);
1332 //PrintInfo(dout_con);
1333 //dout_con<<"RTT = "<<rtt<<std::endl;
1335 /*dout_con<<"OUTGOING: ";
1337 channel->outgoing_reliables.print();
1338 dout_con<<std::endl;*/
1340 catch(NotFoundException &e){
1341 PrintInfo(derr_con);
1342 derr_con<<"WARNING: ACKed packet not "
1347 throw ProcessedSilentlyException("Got an ACK");
1349 else if(controltype == CONTROLTYPE_SET_PEER_ID)
1351 if(packetdata.getSize() < 4)
1352 throw InvalidIncomingDataException
1353 ("packetdata.getSize() < 4 (SET_PEER_ID header size)");
1354 u16 peer_id_new = readU16(&packetdata[2]);
1356 dout_con<<"Got new peer id: "<<peer_id_new<<"... "<<std::endl;
1358 if(GetPeerID() != PEER_ID_INEXISTENT)
1360 PrintInfo(derr_con);
1361 derr_con<<"WARNING: Not changing"
1362 " existing peer id."<<std::endl;
1366 dout_con<<"changing."<<std::endl;
1367 SetPeerID(peer_id_new);
1369 throw ProcessedSilentlyException("Got a SET_PEER_ID");
1371 else if(controltype == CONTROLTYPE_PING)
1373 // Just ignore it, the incoming data already reset
1374 // the timeout counter
1376 dout_con<<"PING"<<std::endl;
1377 throw ProcessedSilentlyException("Got a PING");
1379 else if(controltype == CONTROLTYPE_DISCO)
1381 // Just ignore it, the incoming data already reset
1382 // the timeout counter
1384 dout_con<<"DISCO: Removing peer "<<(peer_id)<<std::endl;
1386 if(deletePeer(peer_id, false) == false)
1388 PrintInfo(derr_con);
1389 derr_con<<"DISCO: Peer not found"<<std::endl;
1392 throw ProcessedSilentlyException("Got a DISCO");
1395 PrintInfo(derr_con);
1396 derr_con<<"INVALID TYPE_CONTROL: invalid controltype="
1397 <<((int)controltype&0xff)<<std::endl;
1398 throw InvalidIncomingDataException("Invalid control type");
1401 else if(type == TYPE_ORIGINAL)
1403 if(packetdata.getSize() < ORIGINAL_HEADER_SIZE)
1404 throw InvalidIncomingDataException
1405 ("packetdata.getSize() < ORIGINAL_HEADER_SIZE");
1407 dout_con<<"RETURNING TYPE_ORIGINAL to user"
1409 // Get the inside packet out and return it
1410 SharedBuffer<u8> payload(packetdata.getSize() - ORIGINAL_HEADER_SIZE);
1411 memcpy(*payload, &packetdata[ORIGINAL_HEADER_SIZE], payload.getSize());
1414 else if(type == TYPE_SPLIT)
1416 // We have to create a packet again for buffering
1417 // This isn't actually too bad an idea.
1418 BufferedPacket packet = makePacket(
1419 getPeer(peer_id)->address,
1424 // Buffer the packet
1425 SharedBuffer<u8> data = channel->incoming_splits.insert(packet, reliable);
1426 if(data.getSize() != 0)
1429 dout_con<<"RETURNING TYPE_SPLIT: Constructed full data, "
1430 <<"size="<<data.getSize()<<std::endl;
1434 dout_con<<"BUFFERED TYPE_SPLIT"<<std::endl;
1435 throw ProcessedSilentlyException("Buffered a split packet chunk");
1437 else if(type == TYPE_RELIABLE)
1439 // Recursive reliable packets not allowed
1441 throw InvalidIncomingDataException("Found nested reliable packets");
1443 if(packetdata.getSize() < RELIABLE_HEADER_SIZE)
1444 throw InvalidIncomingDataException
1445 ("packetdata.getSize() < RELIABLE_HEADER_SIZE");
1447 u16 seqnum = readU16(&packetdata[1]);
1449 bool is_future_packet = seqnum_higher(seqnum, channel->next_incoming_seqnum);
1450 bool is_old_packet = seqnum_higher(channel->next_incoming_seqnum, seqnum);
1453 if(is_future_packet)
1454 dout_con<<"BUFFERING";
1455 else if(is_old_packet)
1459 dout_con<<" TYPE_RELIABLE seqnum="<<seqnum
1460 <<" next="<<channel->next_incoming_seqnum;
1461 dout_con<<" [sending CONTROLTYPE_ACK"
1462 " to peer_id="<<peer_id<<"]";
1463 dout_con<<std::endl;
1466 //assert(channel->incoming_reliables.size() < 100);
1468 // Send a CONTROLTYPE_ACK
1469 SharedBuffer<u8> reply(4);
1470 writeU8(&reply[0], TYPE_CONTROL);
1471 writeU8(&reply[1], CONTROLTYPE_ACK);
1472 writeU16(&reply[2], seqnum);
1473 rawSendAsPacket(peer_id, channelnum, reply, false);
1475 //if(seqnum_higher(seqnum, channel->next_incoming_seqnum))
1476 if(is_future_packet)
1479 dout_con<<"Buffering reliable packet (seqnum="
1480 <<seqnum<<")"<<std::endl;*/
1482 // This one comes later, buffer it.
1483 // Actually we have to make a packet to buffer one.
1484 // Well, we have all the ingredients, so just do it.
1485 BufferedPacket packet = makePacket(
1486 getPeer(peer_id)->address,
1492 channel->incoming_reliables.insert(packet);
1495 dout_con<<"INCOMING: ";
1496 channel->incoming_reliables.print();
1497 dout_con<<std::endl;*/
1499 catch(AlreadyExistsException &e)
1503 throw ProcessedSilentlyException("Buffered future reliable packet");
1505 //else if(seqnum_higher(channel->next_incoming_seqnum, seqnum))
1506 else if(is_old_packet)
1508 // An old packet, dump it
1509 throw InvalidIncomingDataException("Got an old reliable packet");
1512 channel->next_incoming_seqnum++;
1514 // Get out the inside packet and re-process it
1515 SharedBuffer<u8> payload(packetdata.getSize() - RELIABLE_HEADER_SIZE);
1516 memcpy(*payload, &packetdata[RELIABLE_HEADER_SIZE], payload.getSize());
1518 return processPacket(channel, payload, peer_id, channelnum, true);
1522 PrintInfo(derr_con);
1523 derr_con<<"Got invalid type="<<((int)type&0xff)<<std::endl;
1524 throw InvalidIncomingDataException("Invalid packet type");
1527 // We should never get here.
1528 // If you get here, add an exception or a return to some of the
1529 // above conditionals.
1531 throw BaseException("Error in Channel::ProcessPacket()");
1534 bool Connection::deletePeer(u16 peer_id, bool timeout)
1536 if(m_peers.find(peer_id) == m_peers.end())
1539 Peer *peer = m_peers[peer_id];
1543 e.peerRemoved(peer_id, timeout, peer->address);
1546 delete m_peers[peer_id];
1547 m_peers.erase(peer_id);
1553 ConnectionEvent Connection::getEvent()
1555 if(m_event_queue.empty()){
1557 e.type = CONNEVENT_NONE;
1560 return m_event_queue.pop_frontNoEx();
1563 ConnectionEvent Connection::waitEvent(u32 timeout_ms)
1566 return m_event_queue.pop_front(timeout_ms);
1567 } catch(ItemNotFoundException &ex){
1569 e.type = CONNEVENT_NONE;
1574 void Connection::putCommand(ConnectionCommand &c)
1576 m_command_queue.push_back(c);
1579 void Connection::Serve(unsigned short port)
1581 ConnectionCommand c;
1586 void Connection::Connect(Address address)
1588 ConnectionCommand c;
1593 bool Connection::Connected()
1595 JMutexAutoLock peerlock(m_peers_mutex);
1597 if(m_peers.size() != 1)
1600 std::map<u16, Peer*>::iterator node = m_peers.find(PEER_ID_SERVER);
1601 if(node == m_peers.end())
1604 if(m_peer_id == PEER_ID_INEXISTENT)
1610 void Connection::Disconnect()
1612 ConnectionCommand c;
1617 u32 Connection::Receive(u16 &peer_id, SharedBuffer<u8> &data)
1620 ConnectionEvent e = waitEvent(m_bc_receive_timeout);
1621 if(e.type != CONNEVENT_NONE)
1622 dout_con<<getDesc()<<": Receive: got event: "
1623 <<e.describe()<<std::endl;
1625 case CONNEVENT_NONE:
1626 throw NoIncomingDataException("No incoming data");
1627 case CONNEVENT_DATA_RECEIVED:
1628 peer_id = e.peer_id;
1629 data = SharedBuffer<u8>(e.data);
1630 return e.data.getSize();
1631 case CONNEVENT_PEER_ADDED: {
1632 Peer tmp(e.peer_id, e.address);
1633 if(m_bc_peerhandler)
1634 m_bc_peerhandler->peerAdded(&tmp);
1636 case CONNEVENT_PEER_REMOVED: {
1637 Peer tmp(e.peer_id, e.address);
1638 if(m_bc_peerhandler)
1639 m_bc_peerhandler->deletingPeer(&tmp, e.timeout);
1641 case CONNEVENT_BIND_FAILED:
1642 throw ConnectionBindFailed("Failed to bind socket "
1643 "(port already in use?)");
1646 throw NoIncomingDataException("No incoming data");
1649 void Connection::SendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable)
1651 assert(channelnum < CHANNEL_COUNT);
1653 ConnectionCommand c;
1654 c.sendToAll(channelnum, data, reliable);
1658 void Connection::Send(u16 peer_id, u8 channelnum,
1659 SharedBuffer<u8> data, bool reliable)
1661 assert(channelnum < CHANNEL_COUNT);
1663 ConnectionCommand c;
1664 c.send(peer_id, channelnum, data, reliable);
1668 void Connection::RunTimeouts(float dtime)
1673 Address Connection::GetPeerAddress(u16 peer_id)
1675 JMutexAutoLock peerlock(m_peers_mutex);
1676 return getPeer(peer_id)->address;
1679 float Connection::GetPeerAvgRTT(u16 peer_id)
1681 JMutexAutoLock peerlock(m_peers_mutex);
1682 return getPeer(peer_id)->avg_rtt;
1685 void Connection::DeletePeer(u16 peer_id)
1687 ConnectionCommand c;
1688 c.deletePeer(peer_id);
1692 void Connection::PrintInfo(std::ostream &out)
1694 out<<getDesc()<<": ";
1697 void Connection::PrintInfo()
1699 PrintInfo(dout_con);
1702 std::string Connection::getDesc()
1704 return std::string("con(")+itos(m_socket.GetHandle())+"/"+itos(m_peer_id)+")";