3 Copyright (C) 2013 celeron55, Perttu Ahola <celeron55@gmail.com>
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU Lesser General Public License as published by
7 the Free Software Foundation; either version 2.1 of the License, or
8 (at your option) any later version.
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU Lesser General Public License for more details.
15 You should have received a copy of the GNU Lesser General Public License along
16 with this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 #include "connection.h"
22 #include "serialization.h"
25 #include "util/serialize.h"
26 #include "util/numeric.h"
27 #include "util/string.h"
33 static u16 readPeerId(u8 *packetdata)
35 return readU16(&packetdata[4]);
37 static u8 readChannel(u8 *packetdata)
39 return readU8(&packetdata[6]);
42 BufferedPacket makePacket(Address &address, u8 *data, u32 datasize,
43 u32 protocol_id, u16 sender_peer_id, u8 channel)
45 u32 packet_size = datasize + BASE_HEADER_SIZE;
46 BufferedPacket p(packet_size);
49 writeU32(&p.data[0], protocol_id);
50 writeU16(&p.data[4], sender_peer_id);
51 writeU8(&p.data[6], channel);
53 memcpy(&p.data[BASE_HEADER_SIZE], data, datasize);
58 BufferedPacket makePacket(Address &address, SharedBuffer<u8> &data,
59 u32 protocol_id, u16 sender_peer_id, u8 channel)
61 return makePacket(address, *data, data.getSize(),
62 protocol_id, sender_peer_id, channel);
65 SharedBuffer<u8> makeOriginalPacket(
66 SharedBuffer<u8> data)
69 u32 packet_size = data.getSize() + header_size;
70 SharedBuffer<u8> b(packet_size);
72 writeU8(&b[0], TYPE_ORIGINAL);
74 memcpy(&b[header_size], *data, data.getSize());
79 std::list<SharedBuffer<u8> > makeSplitPacket(
80 SharedBuffer<u8> data,
84 // Chunk packets, containing the TYPE_SPLIT header
85 std::list<SharedBuffer<u8> > chunks;
87 u32 chunk_header_size = 7;
88 u32 maximum_data_size = chunksize_max - chunk_header_size;
94 end = start + maximum_data_size - 1;
95 if(end > data.getSize() - 1)
96 end = data.getSize() - 1;
98 u32 payload_size = end - start + 1;
99 u32 packet_size = chunk_header_size + payload_size;
101 SharedBuffer<u8> chunk(packet_size);
103 writeU8(&chunk[0], TYPE_SPLIT);
104 writeU16(&chunk[1], seqnum);
105 // [3] u16 chunk_count is written at next stage
106 writeU16(&chunk[5], chunk_num);
107 memcpy(&chunk[chunk_header_size], &data[start], payload_size);
109 chunks.push_back(chunk);
115 while(end != data.getSize() - 1);
117 for(std::list<SharedBuffer<u8> >::iterator i = chunks.begin();
118 i != chunks.end(); ++i)
121 writeU16(&((*i)[3]), chunk_count);
127 std::list<SharedBuffer<u8> > makeAutoSplitPacket(
128 SharedBuffer<u8> data,
132 u32 original_header_size = 1;
133 std::list<SharedBuffer<u8> > list;
134 if(data.getSize() + original_header_size > chunksize_max)
136 list = makeSplitPacket(data, chunksize_max, split_seqnum);
142 list.push_back(makeOriginalPacket(data));
147 SharedBuffer<u8> makeReliablePacket(
148 SharedBuffer<u8> data,
151 /*dstream<<"BEGIN SharedBuffer<u8> makeReliablePacket()"<<std::endl;
152 dstream<<"data.getSize()="<<data.getSize()<<", data[0]="
153 <<((unsigned int)data[0]&0xff)<<std::endl;*/
155 u32 packet_size = data.getSize() + header_size;
156 SharedBuffer<u8> b(packet_size);
158 writeU8(&b[0], TYPE_RELIABLE);
159 writeU16(&b[1], seqnum);
161 memcpy(&b[header_size], *data, data.getSize());
163 /*dstream<<"data.getSize()="<<data.getSize()<<", data[0]="
164 <<((unsigned int)data[0]&0xff)<<std::endl;*/
165 //dstream<<"END SharedBuffer<u8> makeReliablePacket()"<<std::endl;
173 ReliablePacketBuffer::ReliablePacketBuffer(): m_list_size(0) {}
175 void ReliablePacketBuffer::print()
177 for(std::list<BufferedPacket>::iterator i = m_list.begin();
181 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
185 bool ReliablePacketBuffer::empty()
187 return m_list.empty();
189 u32 ReliablePacketBuffer::size()
193 RPBSearchResult ReliablePacketBuffer::findPacket(u16 seqnum)
195 std::list<BufferedPacket>::iterator i = m_list.begin();
196 for(; i != m_list.end(); ++i)
198 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
199 /*dout_con<<"findPacket(): finding seqnum="<<seqnum
200 <<", comparing to s="<<s<<std::endl;*/
206 RPBSearchResult ReliablePacketBuffer::notFound()
210 u16 ReliablePacketBuffer::getFirstSeqnum()
213 throw NotFoundException("Buffer is empty");
214 BufferedPacket p = *m_list.begin();
215 return readU16(&p.data[BASE_HEADER_SIZE+1]);
217 BufferedPacket ReliablePacketBuffer::popFirst()
220 throw NotFoundException("Buffer is empty");
221 BufferedPacket p = *m_list.begin();
222 m_list.erase(m_list.begin());
226 BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum)
228 RPBSearchResult r = findPacket(seqnum);
230 dout_con<<"Not found"<<std::endl;
231 throw NotFoundException("seqnum not found in buffer");
233 BufferedPacket p = *r;
238 void ReliablePacketBuffer::insert(BufferedPacket &p)
240 assert(p.data.getSize() >= BASE_HEADER_SIZE+3);
241 u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
242 assert(type == TYPE_RELIABLE);
243 u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
246 // Find the right place for the packet and insert it there
248 // If list is empty, just add it
255 // Otherwise find the right place
256 std::list<BufferedPacket>::iterator i = m_list.begin();
257 // Find the first packet in the list which has a higher seqnum
258 for(; i != m_list.end(); ++i){
259 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
262 throw AlreadyExistsException("Same seqnum in list");
264 if(seqnum_higher(s, seqnum)){
268 // If we're at the end of the list, add the packet to the
270 if(i == m_list.end())
280 void ReliablePacketBuffer::incrementTimeouts(float dtime)
282 for(std::list<BufferedPacket>::iterator i = m_list.begin();
283 i != m_list.end(); ++i)
286 i->totaltime += dtime;
290 void ReliablePacketBuffer::resetTimedOuts(float timeout)
292 for(std::list<BufferedPacket>::iterator i = m_list.begin();
293 i != m_list.end(); ++i)
295 if(i->time >= timeout)
300 bool ReliablePacketBuffer::anyTotaltimeReached(float timeout)
302 for(std::list<BufferedPacket>::iterator i = m_list.begin();
303 i != m_list.end(); ++i)
305 if(i->totaltime >= timeout)
311 std::list<BufferedPacket> ReliablePacketBuffer::getTimedOuts(float timeout)
313 std::list<BufferedPacket> timed_outs;
314 for(std::list<BufferedPacket>::iterator i = m_list.begin();
315 i != m_list.end(); ++i)
317 if(i->time >= timeout)
318 timed_outs.push_back(*i);
327 IncomingSplitBuffer::~IncomingSplitBuffer()
329 for(std::map<u16, IncomingSplitPacket*>::iterator i = m_buf.begin();
330 i != m_buf.end(); ++i)
336 This will throw a GotSplitPacketException when a full
337 split packet is constructed.
339 SharedBuffer<u8> IncomingSplitBuffer::insert(BufferedPacket &p, bool reliable)
341 u32 headersize = BASE_HEADER_SIZE + 7;
342 assert(p.data.getSize() >= headersize);
343 u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
344 assert(type == TYPE_SPLIT);
345 u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
346 u16 chunk_count = readU16(&p.data[BASE_HEADER_SIZE+3]);
347 u16 chunk_num = readU16(&p.data[BASE_HEADER_SIZE+5]);
349 // Add if doesn't exist
350 if(m_buf.find(seqnum) == m_buf.end())
352 IncomingSplitPacket *sp = new IncomingSplitPacket();
353 sp->chunk_count = chunk_count;
354 sp->reliable = reliable;
358 IncomingSplitPacket *sp = m_buf[seqnum];
360 // TODO: These errors should be thrown or something? Dunno.
361 if(chunk_count != sp->chunk_count)
362 derr_con<<"Connection: WARNING: chunk_count="<<chunk_count
363 <<" != sp->chunk_count="<<sp->chunk_count
365 if(reliable != sp->reliable)
366 derr_con<<"Connection: WARNING: reliable="<<reliable
367 <<" != sp->reliable="<<sp->reliable
370 // If chunk already exists, ignore it.
371 // Sometimes two identical packets may arrive when there is network
372 // lag and the server re-sends stuff.
373 if(sp->chunks.find(chunk_num) != sp->chunks.end())
374 return SharedBuffer<u8>();
376 // Cut chunk data out of packet
377 u32 chunkdatasize = p.data.getSize() - headersize;
378 SharedBuffer<u8> chunkdata(chunkdatasize);
379 memcpy(*chunkdata, &(p.data[headersize]), chunkdatasize);
381 // Set chunk data in buffer
382 sp->chunks[chunk_num] = chunkdata;
384 // If not all chunks are received, return empty buffer
385 if(sp->allReceived() == false)
386 return SharedBuffer<u8>();
388 // Calculate total size
390 for(std::map<u16, SharedBuffer<u8> >::iterator i = sp->chunks.begin();
391 i != sp->chunks.end(); ++i)
393 totalsize += i->second.getSize();
396 SharedBuffer<u8> fulldata(totalsize);
398 // Copy chunks to data buffer
400 for(u32 chunk_i=0; chunk_i<sp->chunk_count;
403 SharedBuffer<u8> buf = sp->chunks[chunk_i];
404 u16 chunkdatasize = buf.getSize();
405 memcpy(&fulldata[start], *buf, chunkdatasize);
406 start += chunkdatasize;;
409 // Remove sp from buffer
415 void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout)
417 std::list<u16> remove_queue;
418 for(std::map<u16, IncomingSplitPacket*>::iterator i = m_buf.begin();
419 i != m_buf.end(); ++i)
421 IncomingSplitPacket *p = i->second;
422 // Reliable ones are not removed by timeout
423 if(p->reliable == true)
426 if(p->time >= timeout)
427 remove_queue.push_back(i->first);
429 for(std::list<u16>::iterator j = remove_queue.begin();
430 j != remove_queue.end(); ++j)
432 dout_con<<"NOTE: Removing timed out unreliable split packet"
445 next_outgoing_seqnum = SEQNUM_INITIAL;
446 next_incoming_seqnum = SEQNUM_INITIAL;
447 next_outgoing_split_seqnum = SEQNUM_INITIAL;
457 Peer::Peer(u16 a_id, Address a_address):
460 timeout_counter(0.0),
464 has_sent_with_id(false),
466 m_max_packets_per_second(10),
469 congestion_control_aim_rtt(0.2),
470 congestion_control_max_rate(400),
471 congestion_control_min_rate(10)
478 void Peer::reportRTT(float rtt)
482 if(m_max_packets_per_second < congestion_control_max_rate)
483 m_max_packets_per_second += 10;
484 } else if(rtt < congestion_control_aim_rtt){
485 if(m_max_packets_per_second < congestion_control_max_rate)
486 m_max_packets_per_second += 2;
488 m_max_packets_per_second *= 0.8;
489 if(m_max_packets_per_second < congestion_control_min_rate)
490 m_max_packets_per_second = congestion_control_min_rate;
496 else if(avg_rtt < 0.0)
499 avg_rtt = rtt * 0.1 + avg_rtt * 0.9;
501 // Calculate resend_timeout
503 /*int reliable_count = 0;
504 for(int i=0; i<CHANNEL_COUNT; i++)
506 reliable_count += channels[i].outgoing_reliables.size();
508 float timeout = avg_rtt * RESEND_TIMEOUT_FACTOR
509 * ((float)reliable_count * 1);*/
511 float timeout = avg_rtt * RESEND_TIMEOUT_FACTOR;
512 if(timeout < RESEND_TIMEOUT_MIN)
513 timeout = RESEND_TIMEOUT_MIN;
514 if(timeout > RESEND_TIMEOUT_MAX)
515 timeout = RESEND_TIMEOUT_MAX;
516 resend_timeout = timeout;
523 Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout,
525 m_protocol_id(protocol_id),
526 m_max_packet_size(max_packet_size),
530 m_bc_peerhandler(NULL),
531 m_bc_receive_timeout(0),
534 m_socket.setTimeoutMs(5);
539 Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout,
540 bool ipv6, PeerHandler *peerhandler):
541 m_protocol_id(protocol_id),
542 m_max_packet_size(max_packet_size),
546 m_bc_peerhandler(peerhandler),
547 m_bc_receive_timeout(0),
550 m_socket.setTimeoutMs(5);
556 Connection::~Connection()
560 for(std::map<u16, Peer*>::iterator
562 j != m_peers.end(); ++j)
570 void * Connection::Thread()
573 log_register_thread("Connection");
575 dout_con<<"Connection thread started"<<std::endl;
577 u32 curtime = porting::getTimeMs();
578 u32 lasttime = curtime;
582 BEGIN_DEBUG_EXCEPTION_HANDLER
585 curtime = porting::getTimeMs();
586 float dtime = (float)(curtime - lasttime) / 1000.;
594 while(!m_command_queue.empty()){
595 ConnectionCommand c = m_command_queue.pop_front();
603 END_DEBUG_EXCEPTION_HANDLER(derr_con);
609 void Connection::putEvent(ConnectionEvent &e)
611 assert(e.type != CONNEVENT_NONE);
612 m_event_queue.push_back(e);
615 void Connection::processCommand(ConnectionCommand &c)
619 dout_con<<getDesc()<<" processing CONNCMD_NONE"<<std::endl;
622 dout_con<<getDesc()<<" processing CONNCMD_SERVE port="
626 case CONNCMD_CONNECT:
627 dout_con<<getDesc()<<" processing CONNCMD_CONNECT"<<std::endl;
630 case CONNCMD_DISCONNECT:
631 dout_con<<getDesc()<<" processing CONNCMD_DISCONNECT"<<std::endl;
635 dout_con<<getDesc()<<" processing CONNCMD_SEND"<<std::endl;
636 send(c.peer_id, c.channelnum, c.data, c.reliable);
638 case CONNCMD_SEND_TO_ALL:
639 dout_con<<getDesc()<<" processing CONNCMD_SEND_TO_ALL"<<std::endl;
640 sendToAll(c.channelnum, c.data, c.reliable);
642 case CONNCMD_DELETE_PEER:
643 dout_con<<getDesc()<<" processing CONNCMD_DELETE_PEER"<<std::endl;
644 deletePeer(c.peer_id, false);
649 void Connection::send(float dtime)
651 for(std::map<u16, Peer*>::iterator
653 j != m_peers.end(); ++j)
655 Peer *peer = j->second;
656 peer->m_sendtime_accu += dtime;
657 peer->m_num_sent = 0;
658 peer->m_max_num_sent = peer->m_sendtime_accu *
659 peer->m_max_packets_per_second;
661 Queue<OutgoingPacket> postponed_packets;
662 while(!m_outgoing_queue.empty()){
663 OutgoingPacket packet = m_outgoing_queue.pop_front();
664 Peer *peer = getPeerNoEx(packet.peer_id);
667 if(peer->channels[packet.channelnum].outgoing_reliables.size() >= 5){
668 postponed_packets.push_back(packet);
669 } else if(peer->m_num_sent < peer->m_max_num_sent){
670 rawSendAsPacket(packet.peer_id, packet.channelnum,
671 packet.data, packet.reliable);
674 postponed_packets.push_back(packet);
677 while(!postponed_packets.empty()){
678 m_outgoing_queue.push_back(postponed_packets.pop_front());
680 for(std::map<u16, Peer*>::iterator
682 j != m_peers.end(); ++j)
684 Peer *peer = j->second;
685 peer->m_sendtime_accu -= (float)peer->m_num_sent /
686 peer->m_max_packets_per_second;
687 if(peer->m_sendtime_accu > 10. / peer->m_max_packets_per_second)
688 peer->m_sendtime_accu = 10. / peer->m_max_packets_per_second;
692 // Receive packets from the network and buffers and create ConnectionEvents
693 void Connection::receive()
695 u32 datasize = m_max_packet_size * 2; // Double it just to be safe
696 // TODO: We can not know how many layers of header there are.
697 // For now, just assume there are no other than the base headers.
698 u32 packet_maxsize = datasize + BASE_HEADER_SIZE;
699 SharedBuffer<u8> packetdata(packet_maxsize);
701 bool single_wait_done = false;
706 /* Check if some buffer has relevant data */
709 SharedBuffer<u8> resultdata;
710 bool got = getFromBuffers(peer_id, resultdata);
713 e.dataReceived(peer_id, resultdata);
719 if(single_wait_done){
720 if(m_socket.WaitData(0) == false)
724 single_wait_done = true;
727 s32 received_size = m_socket.Receive(sender, *packetdata, packet_maxsize);
729 if(received_size < 0)
731 if(received_size < BASE_HEADER_SIZE)
733 if(readU32(&packetdata[0]) != m_protocol_id)
736 u16 peer_id = readPeerId(*packetdata);
737 u8 channelnum = readChannel(*packetdata);
738 if(channelnum > CHANNEL_COUNT-1){
740 derr_con<<"Receive(): Invalid channel "<<channelnum<<std::endl;
741 throw InvalidIncomingDataException("Channel doesn't exist");
744 if(peer_id == PEER_ID_INEXISTENT)
747 Somebody is trying to send stuff to us with no peer id.
749 Check if the same address and port was added to our peer
751 Allow only entries that have has_sent_with_id==false.
754 std::map<u16, Peer*>::iterator j;
756 for(; j != m_peers.end(); ++j)
758 Peer *peer = j->second;
759 if(peer->has_sent_with_id)
761 if(peer->address == sender)
766 If no peer was found with the same address and port,
767 we shall assume it is a new peer and create an entry.
769 if(j == m_peers.end())
771 // Pass on to adding the peer
773 // Else: A peer was found.
776 Peer *peer = j->second;
779 derr_con<<"WARNING: Assuming unknown peer to be "
780 <<"peer_id="<<peer_id<<std::endl;
785 The peer was not found in our lists. Add it.
787 if(peer_id == PEER_ID_INEXISTENT)
789 // Somebody wants to make a new connection
791 // Get a unique peer id (2 or higher)
794 Find an unused peer id
796 bool out_of_ids = false;
800 if(m_peers.find(peer_id_new) == m_peers.end())
802 // Check for overflow
803 if(peer_id_new == 65535){
810 errorstream<<getDesc()<<" ran out of peer ids"<<std::endl;
815 dout_con<<"Receive(): Got a packet with peer_id=PEER_ID_INEXISTENT,"
816 " giving peer_id="<<peer_id_new<<std::endl;
819 Peer *peer = new Peer(peer_id_new, sender);
820 m_peers[peer->id] = peer;
822 // Create peer addition event
824 e.peerAdded(peer_id_new, sender);
827 // Create CONTROL packet to tell the peer id to the new peer.
828 SharedBuffer<u8> reply(4);
829 writeU8(&reply[0], TYPE_CONTROL);
830 writeU8(&reply[1], CONTROLTYPE_SET_PEER_ID);
831 writeU16(&reply[2], peer_id_new);
832 sendAsPacket(peer_id_new, 0, reply, true);
834 // We're now talking to a valid peer_id
835 peer_id = peer_id_new;
837 // Go on and process whatever it sent
840 std::map<u16, Peer*>::iterator node = m_peers.find(peer_id);
842 if(node == m_peers.end())
845 // This means that the peer id of the sender is not PEER_ID_INEXISTENT
846 // and it is invalid.
848 derr_con<<"Receive(): Peer not found"<<std::endl;
849 throw InvalidIncomingDataException("Peer not found (possible timeout)");
852 Peer *peer = node->second;
854 // Validate peer address
855 if(peer->address != sender)
858 derr_con<<"Peer "<<peer_id<<" sending from different address."
859 " Ignoring."<<std::endl;
863 peer->timeout_counter = 0.0;
865 Channel *channel = &(peer->channels[channelnum]);
867 // Throw the received packet to channel->processPacket()
869 // Make a new SharedBuffer from the data without the base headers
870 SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE);
871 memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE],
872 strippeddata.getSize());
875 // Process it (the result is some data with no headers made by us)
876 SharedBuffer<u8> resultdata = processPacket
877 (channel, strippeddata, peer_id, channelnum, false);
880 dout_con<<"ProcessPacket returned data of size "
881 <<resultdata.getSize()<<std::endl;
884 e.dataReceived(peer_id, resultdata);
887 }catch(ProcessedSilentlyException &e){
889 }catch(InvalidIncomingDataException &e){
891 catch(ProcessedSilentlyException &e){
896 void Connection::runTimeouts(float dtime)
898 float congestion_control_aim_rtt
899 = g_settings->getFloat("congestion_control_aim_rtt");
900 float congestion_control_max_rate
901 = g_settings->getFloat("congestion_control_max_rate");
902 float congestion_control_min_rate
903 = g_settings->getFloat("congestion_control_min_rate");
905 std::list<u16> timeouted_peers;
906 for(std::map<u16, Peer*>::iterator j = m_peers.begin();
907 j != m_peers.end(); ++j)
909 Peer *peer = j->second;
911 // Update congestion control values
912 peer->congestion_control_aim_rtt = congestion_control_aim_rtt;
913 peer->congestion_control_max_rate = congestion_control_max_rate;
914 peer->congestion_control_min_rate = congestion_control_min_rate;
919 peer->timeout_counter += dtime;
920 if(peer->timeout_counter > m_timeout)
923 derr_con<<"RunTimeouts(): Peer "<<peer->id
925 <<" (source=peer->timeout_counter)"
927 // Add peer to the list
928 timeouted_peers.push_back(peer->id);
929 // Don't bother going through the buffers of this one
933 float resend_timeout = peer->resend_timeout;
934 for(u16 i=0; i<CHANNEL_COUNT; i++)
936 std::list<BufferedPacket> timed_outs;
938 Channel *channel = &peer->channels[i];
940 // Remove timed out incomplete unreliable split packets
941 channel->incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout);
943 // Increment reliable packet times
944 channel->outgoing_reliables.incrementTimeouts(dtime);
946 // Check reliable packet total times, remove peer if
948 if(channel->outgoing_reliables.anyTotaltimeReached(m_timeout))
951 derr_con<<"RunTimeouts(): Peer "<<peer->id
953 <<" (source=reliable packet totaltime)"
955 // Add peer to the to-be-removed list
956 timeouted_peers.push_back(peer->id);
960 // Re-send timed out outgoing reliables
962 timed_outs = channel->
963 outgoing_reliables.getTimedOuts(resend_timeout);
965 channel->outgoing_reliables.resetTimedOuts(resend_timeout);
967 for(std::list<BufferedPacket>::iterator j = timed_outs.begin();
968 j != timed_outs.end(); ++j)
970 u16 peer_id = readPeerId(*(j->data));
971 u8 channel = readChannel(*(j->data));
972 u16 seqnum = readU16(&(j->data[BASE_HEADER_SIZE+1]));
975 derr_con<<"RE-SENDING timed-out RELIABLE to ";
976 j->address.print(&derr_con);
977 derr_con<<"(t/o="<<resend_timeout<<"): "
978 <<"from_peer_id="<<peer_id
979 <<", channel="<<((int)channel&0xff)
980 <<", seqnum="<<seqnum
985 // Enlarge avg_rtt and resend_timeout:
986 // The rtt will be at least the timeout.
987 // NOTE: This won't affect the timeout of the next
988 // checked channel because it was cached.
989 peer->reportRTT(resend_timeout);
996 peer->ping_timer += dtime;
997 if(peer->ping_timer >= 5.0)
999 // Create and send PING packet
1000 SharedBuffer<u8> data(2);
1001 writeU8(&data[0], TYPE_CONTROL);
1002 writeU8(&data[1], CONTROLTYPE_PING);
1003 rawSendAsPacket(peer->id, 0, data, true);
1005 peer->ping_timer = 0.0;
1012 // Remove timed out peers
1013 for(std::list<u16>::iterator i = timeouted_peers.begin();
1014 i != timeouted_peers.end(); ++i)
1016 PrintInfo(derr_con);
1017 derr_con<<"RunTimeouts(): Removing peer "<<(*i)<<std::endl;
1018 deletePeer(*i, true);
1022 void Connection::serve(u16 port)
1024 dout_con<<getDesc()<<" serving at port "<<port<<std::endl;
1026 m_socket.Bind(port);
1027 m_peer_id = PEER_ID_SERVER;
1029 catch(SocketException &e){
1037 void Connection::connect(Address address)
1039 dout_con<<getDesc()<<" connecting to "<<address.serializeString()
1040 <<":"<<address.getPort()<<std::endl;
1042 std::map<u16, Peer*>::iterator node = m_peers.find(PEER_ID_SERVER);
1043 if(node != m_peers.end()){
1044 throw ConnectionException("Already connected to a server");
1047 Peer *peer = new Peer(PEER_ID_SERVER, address);
1048 m_peers[peer->id] = peer;
1052 e.peerAdded(peer->id, peer->address);
1057 // Send a dummy packet to server with peer_id = PEER_ID_INEXISTENT
1058 m_peer_id = PEER_ID_INEXISTENT;
1059 SharedBuffer<u8> data(0);
1060 Send(PEER_ID_SERVER, 0, data, true);
1063 void Connection::disconnect()
1065 dout_con<<getDesc()<<" disconnecting"<<std::endl;
1067 // Create and send DISCO packet
1068 SharedBuffer<u8> data(2);
1069 writeU8(&data[0], TYPE_CONTROL);
1070 writeU8(&data[1], CONTROLTYPE_DISCO);
1073 for(std::map<u16, Peer*>::iterator j = m_peers.begin();
1074 j != m_peers.end(); ++j)
1076 Peer *peer = j->second;
1077 rawSendAsPacket(peer->id, 0, data, false);
1081 void Connection::sendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable)
1083 for(std::map<u16, Peer*>::iterator j = m_peers.begin();
1084 j != m_peers.end(); ++j)
1086 Peer *peer = j->second;
1087 send(peer->id, channelnum, data, reliable);
1091 void Connection::send(u16 peer_id, u8 channelnum,
1092 SharedBuffer<u8> data, bool reliable)
1094 dout_con<<getDesc()<<" sending to peer_id="<<peer_id<<std::endl;
1096 assert(channelnum < CHANNEL_COUNT);
1098 Peer *peer = getPeerNoEx(peer_id);
1101 Channel *channel = &(peer->channels[channelnum]);
1103 u32 chunksize_max = m_max_packet_size - BASE_HEADER_SIZE;
1105 chunksize_max -= RELIABLE_HEADER_SIZE;
1107 std::list<SharedBuffer<u8> > originals;
1108 originals = makeAutoSplitPacket(data, chunksize_max,
1109 channel->next_outgoing_split_seqnum);
1111 for(std::list<SharedBuffer<u8> >::iterator i = originals.begin();
1112 i != originals.end(); ++i)
1114 SharedBuffer<u8> original = *i;
1116 sendAsPacket(peer_id, channelnum, original, reliable);
1120 void Connection::sendAsPacket(u16 peer_id, u8 channelnum,
1121 SharedBuffer<u8> data, bool reliable)
1123 OutgoingPacket packet(peer_id, channelnum, data, reliable);
1124 m_outgoing_queue.push_back(packet);
1127 void Connection::rawSendAsPacket(u16 peer_id, u8 channelnum,
1128 SharedBuffer<u8> data, bool reliable)
1130 Peer *peer = getPeerNoEx(peer_id);
1133 Channel *channel = &(peer->channels[channelnum]);
1137 u16 seqnum = channel->next_outgoing_seqnum;
1138 channel->next_outgoing_seqnum++;
1140 SharedBuffer<u8> reliable = makeReliablePacket(data, seqnum);
1142 // Add base headers and make a packet
1143 BufferedPacket p = makePacket(peer->address, reliable,
1144 m_protocol_id, m_peer_id, channelnum);
1147 // Buffer the packet
1148 channel->outgoing_reliables.insert(p);
1150 catch(AlreadyExistsException &e)
1152 PrintInfo(derr_con);
1153 derr_con<<"WARNING: Going to send a reliable packet "
1154 "seqnum="<<seqnum<<" that is already "
1155 "in outgoing buffer"<<std::endl;
1164 // Add base headers and make a packet
1165 BufferedPacket p = makePacket(peer->address, data,
1166 m_protocol_id, m_peer_id, channelnum);
1173 void Connection::rawSend(const BufferedPacket &packet)
1176 m_socket.Send(packet.address, *packet.data, packet.data.getSize());
1177 } catch(SendFailedException &e){
1178 derr_con<<"Connection::rawSend(): SendFailedException: "
1179 <<packet.address.serializeString()<<std::endl;
1183 Peer* Connection::getPeer(u16 peer_id)
1185 std::map<u16, Peer*>::iterator node = m_peers.find(peer_id);
1187 if(node == m_peers.end()){
1188 throw PeerNotFoundException("GetPeer: Peer not found (possible timeout)");
1192 assert(node->second->id == peer_id);
1194 return node->second;
1197 Peer* Connection::getPeerNoEx(u16 peer_id)
1199 std::map<u16, Peer*>::iterator node = m_peers.find(peer_id);
1201 if(node == m_peers.end()){
1206 assert(node->second->id == peer_id);
1208 return node->second;
1211 std::list<Peer*> Connection::getPeers()
1213 std::list<Peer*> list;
1214 for(std::map<u16, Peer*>::iterator j = m_peers.begin();
1215 j != m_peers.end(); ++j)
1217 Peer *peer = j->second;
1218 list.push_back(peer);
1223 bool Connection::getFromBuffers(u16 &peer_id, SharedBuffer<u8> &dst)
1225 for(std::map<u16, Peer*>::iterator j = m_peers.begin();
1226 j != m_peers.end(); ++j)
1228 Peer *peer = j->second;
1229 for(u16 i=0; i<CHANNEL_COUNT; i++)
1231 Channel *channel = &peer->channels[i];
1232 SharedBuffer<u8> resultdata;
1233 bool got = checkIncomingBuffers(channel, peer_id, resultdata);
1243 bool Connection::checkIncomingBuffers(Channel *channel, u16 &peer_id,
1244 SharedBuffer<u8> &dst)
1246 u16 firstseqnum = 0;
1247 // Clear old packets from start of buffer
1250 firstseqnum = channel->incoming_reliables.getFirstSeqnum();
1251 if(seqnum_higher(channel->next_incoming_seqnum, firstseqnum))
1252 channel->incoming_reliables.popFirst();
1256 // This happens if all packets are old
1257 }catch(con::NotFoundException)
1260 if(channel->incoming_reliables.empty() == false)
1262 if(firstseqnum == channel->next_incoming_seqnum)
1264 BufferedPacket p = channel->incoming_reliables.popFirst();
1266 peer_id = readPeerId(*p.data);
1267 u8 channelnum = readChannel(*p.data);
1268 u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
1271 dout_con<<"UNBUFFERING TYPE_RELIABLE"
1272 <<" seqnum="<<seqnum
1273 <<" peer_id="<<peer_id
1274 <<" channel="<<((int)channelnum&0xff)
1277 channel->next_incoming_seqnum++;
1279 u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
1280 // Get out the inside packet and re-process it
1281 SharedBuffer<u8> payload(p.data.getSize() - headers_size);
1282 memcpy(*payload, &p.data[headers_size], payload.getSize());
1284 dst = processPacket(channel, payload, peer_id, channelnum, true);
1291 SharedBuffer<u8> Connection::processPacket(Channel *channel,
1292 SharedBuffer<u8> packetdata, u16 peer_id,
1293 u8 channelnum, bool reliable)
1295 IndentationRaiser iraiser(&(m_indentation));
1297 if(packetdata.getSize() < 1)
1298 throw InvalidIncomingDataException("packetdata.getSize() < 1");
1300 u8 type = readU8(&packetdata[0]);
1302 if(type == TYPE_CONTROL)
1304 if(packetdata.getSize() < 2)
1305 throw InvalidIncomingDataException("packetdata.getSize() < 2");
1307 u8 controltype = readU8(&packetdata[1]);
1309 if(controltype == CONTROLTYPE_ACK)
1311 if(packetdata.getSize() < 4)
1312 throw InvalidIncomingDataException
1313 ("packetdata.getSize() < 4 (ACK header size)");
1315 u16 seqnum = readU16(&packetdata[2]);
1317 dout_con<<"Got CONTROLTYPE_ACK: channelnum="
1318 <<((int)channelnum&0xff)<<", peer_id="<<peer_id
1319 <<", seqnum="<<seqnum<<std::endl;
1322 BufferedPacket p = channel->outgoing_reliables.popSeqnum(seqnum);
1323 // Get round trip time
1324 float rtt = p.totaltime;
1326 // Let peer calculate stuff according to it
1327 // (avg_rtt and resend_timeout)
1328 Peer *peer = getPeer(peer_id);
1329 peer->reportRTT(rtt);
1331 //PrintInfo(dout_con);
1332 //dout_con<<"RTT = "<<rtt<<std::endl;
1334 /*dout_con<<"OUTGOING: ";
1336 channel->outgoing_reliables.print();
1337 dout_con<<std::endl;*/
1339 catch(NotFoundException &e){
1340 PrintInfo(derr_con);
1341 derr_con<<"WARNING: ACKed packet not "
1346 throw ProcessedSilentlyException("Got an ACK");
1348 else if(controltype == CONTROLTYPE_SET_PEER_ID)
1350 if(packetdata.getSize() < 4)
1351 throw InvalidIncomingDataException
1352 ("packetdata.getSize() < 4 (SET_PEER_ID header size)");
1353 u16 peer_id_new = readU16(&packetdata[2]);
1355 dout_con<<"Got new peer id: "<<peer_id_new<<"... "<<std::endl;
1357 if(GetPeerID() != PEER_ID_INEXISTENT)
1359 PrintInfo(derr_con);
1360 derr_con<<"WARNING: Not changing"
1361 " existing peer id."<<std::endl;
1365 dout_con<<"changing."<<std::endl;
1366 SetPeerID(peer_id_new);
1368 throw ProcessedSilentlyException("Got a SET_PEER_ID");
1370 else if(controltype == CONTROLTYPE_PING)
1372 // Just ignore it, the incoming data already reset
1373 // the timeout counter
1375 dout_con<<"PING"<<std::endl;
1376 throw ProcessedSilentlyException("Got a PING");
1378 else if(controltype == CONTROLTYPE_DISCO)
1380 // Just ignore it, the incoming data already reset
1381 // the timeout counter
1383 dout_con<<"DISCO: Removing peer "<<(peer_id)<<std::endl;
1385 if(deletePeer(peer_id, false) == false)
1387 PrintInfo(derr_con);
1388 derr_con<<"DISCO: Peer not found"<<std::endl;
1391 throw ProcessedSilentlyException("Got a DISCO");
1394 PrintInfo(derr_con);
1395 derr_con<<"INVALID TYPE_CONTROL: invalid controltype="
1396 <<((int)controltype&0xff)<<std::endl;
1397 throw InvalidIncomingDataException("Invalid control type");
1400 else if(type == TYPE_ORIGINAL)
1402 if(packetdata.getSize() < ORIGINAL_HEADER_SIZE)
1403 throw InvalidIncomingDataException
1404 ("packetdata.getSize() < ORIGINAL_HEADER_SIZE");
1406 dout_con<<"RETURNING TYPE_ORIGINAL to user"
1408 // Get the inside packet out and return it
1409 SharedBuffer<u8> payload(packetdata.getSize() - ORIGINAL_HEADER_SIZE);
1410 memcpy(*payload, &packetdata[ORIGINAL_HEADER_SIZE], payload.getSize());
1413 else if(type == TYPE_SPLIT)
1415 // We have to create a packet again for buffering
1416 // This isn't actually too bad an idea.
1417 BufferedPacket packet = makePacket(
1418 getPeer(peer_id)->address,
1423 // Buffer the packet
1424 SharedBuffer<u8> data = channel->incoming_splits.insert(packet, reliable);
1425 if(data.getSize() != 0)
1428 dout_con<<"RETURNING TYPE_SPLIT: Constructed full data, "
1429 <<"size="<<data.getSize()<<std::endl;
1433 dout_con<<"BUFFERED TYPE_SPLIT"<<std::endl;
1434 throw ProcessedSilentlyException("Buffered a split packet chunk");
1436 else if(type == TYPE_RELIABLE)
1438 // Recursive reliable packets not allowed
1439 assert(reliable == false);
1441 if(packetdata.getSize() < RELIABLE_HEADER_SIZE)
1442 throw InvalidIncomingDataException
1443 ("packetdata.getSize() < RELIABLE_HEADER_SIZE");
1445 u16 seqnum = readU16(&packetdata[1]);
1447 bool is_future_packet = seqnum_higher(seqnum, channel->next_incoming_seqnum);
1448 bool is_old_packet = seqnum_higher(channel->next_incoming_seqnum, seqnum);
1451 if(is_future_packet)
1452 dout_con<<"BUFFERING";
1453 else if(is_old_packet)
1457 dout_con<<" TYPE_RELIABLE seqnum="<<seqnum
1458 <<" next="<<channel->next_incoming_seqnum;
1459 dout_con<<" [sending CONTROLTYPE_ACK"
1460 " to peer_id="<<peer_id<<"]";
1461 dout_con<<std::endl;
1464 //assert(channel->incoming_reliables.size() < 100);
1466 // Send a CONTROLTYPE_ACK
1467 SharedBuffer<u8> reply(4);
1468 writeU8(&reply[0], TYPE_CONTROL);
1469 writeU8(&reply[1], CONTROLTYPE_ACK);
1470 writeU16(&reply[2], seqnum);
1471 rawSendAsPacket(peer_id, channelnum, reply, false);
1473 //if(seqnum_higher(seqnum, channel->next_incoming_seqnum))
1474 if(is_future_packet)
1477 dout_con<<"Buffering reliable packet (seqnum="
1478 <<seqnum<<")"<<std::endl;*/
1480 // This one comes later, buffer it.
1481 // Actually we have to make a packet to buffer one.
1482 // Well, we have all the ingredients, so just do it.
1483 BufferedPacket packet = makePacket(
1484 getPeer(peer_id)->address,
1490 channel->incoming_reliables.insert(packet);
1493 dout_con<<"INCOMING: ";
1494 channel->incoming_reliables.print();
1495 dout_con<<std::endl;*/
1497 catch(AlreadyExistsException &e)
1501 throw ProcessedSilentlyException("Buffered future reliable packet");
1503 //else if(seqnum_higher(channel->next_incoming_seqnum, seqnum))
1504 else if(is_old_packet)
1506 // An old packet, dump it
1507 throw InvalidIncomingDataException("Got an old reliable packet");
1510 channel->next_incoming_seqnum++;
1512 // Get out the inside packet and re-process it
1513 SharedBuffer<u8> payload(packetdata.getSize() - RELIABLE_HEADER_SIZE);
1514 memcpy(*payload, &packetdata[RELIABLE_HEADER_SIZE], payload.getSize());
1516 return processPacket(channel, payload, peer_id, channelnum, true);
1520 PrintInfo(derr_con);
1521 derr_con<<"Got invalid type="<<((int)type&0xff)<<std::endl;
1522 throw InvalidIncomingDataException("Invalid packet type");
1525 // We should never get here.
1526 // If you get here, add an exception or a return to some of the
1527 // above conditionals.
1529 throw BaseException("Error in Channel::ProcessPacket()");
1532 bool Connection::deletePeer(u16 peer_id, bool timeout)
1534 if(m_peers.find(peer_id) == m_peers.end())
1537 Peer *peer = m_peers[peer_id];
1541 e.peerRemoved(peer_id, timeout, peer->address);
1544 delete m_peers[peer_id];
1545 m_peers.erase(peer_id);
1551 ConnectionEvent Connection::getEvent()
1553 if(m_event_queue.empty()){
1555 e.type = CONNEVENT_NONE;
1558 return m_event_queue.pop_front();
1561 ConnectionEvent Connection::waitEvent(u32 timeout_ms)
1564 return m_event_queue.pop_front(timeout_ms);
1565 } catch(ItemNotFoundException &ex){
1567 e.type = CONNEVENT_NONE;
1572 void Connection::putCommand(ConnectionCommand &c)
1574 m_command_queue.push_back(c);
1577 void Connection::Serve(unsigned short port)
1579 ConnectionCommand c;
1584 void Connection::Connect(Address address)
1586 ConnectionCommand c;
1591 bool Connection::Connected()
1593 JMutexAutoLock peerlock(m_peers_mutex);
1595 if(m_peers.size() != 1)
1598 std::map<u16, Peer*>::iterator node = m_peers.find(PEER_ID_SERVER);
1599 if(node == m_peers.end())
1602 if(m_peer_id == PEER_ID_INEXISTENT)
1608 void Connection::Disconnect()
1610 ConnectionCommand c;
1615 u32 Connection::Receive(u16 &peer_id, SharedBuffer<u8> &data)
1618 ConnectionEvent e = waitEvent(m_bc_receive_timeout);
1619 if(e.type != CONNEVENT_NONE)
1620 dout_con<<getDesc()<<": Receive: got event: "
1621 <<e.describe()<<std::endl;
1623 case CONNEVENT_NONE:
1624 throw NoIncomingDataException("No incoming data");
1625 case CONNEVENT_DATA_RECEIVED:
1626 peer_id = e.peer_id;
1627 data = SharedBuffer<u8>(e.data);
1628 return e.data.getSize();
1629 case CONNEVENT_PEER_ADDED: {
1630 Peer tmp(e.peer_id, e.address);
1631 if(m_bc_peerhandler)
1632 m_bc_peerhandler->peerAdded(&tmp);
1634 case CONNEVENT_PEER_REMOVED: {
1635 Peer tmp(e.peer_id, e.address);
1636 if(m_bc_peerhandler)
1637 m_bc_peerhandler->deletingPeer(&tmp, e.timeout);
1639 case CONNEVENT_BIND_FAILED:
1640 throw ConnectionBindFailed("Failed to bind socket "
1641 "(port already in use?)");
1644 throw NoIncomingDataException("No incoming data");
1647 void Connection::SendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable)
1649 assert(channelnum < CHANNEL_COUNT);
1651 ConnectionCommand c;
1652 c.sendToAll(channelnum, data, reliable);
1656 void Connection::Send(u16 peer_id, u8 channelnum,
1657 SharedBuffer<u8> data, bool reliable)
1659 assert(channelnum < CHANNEL_COUNT);
1661 ConnectionCommand c;
1662 c.send(peer_id, channelnum, data, reliable);
1666 void Connection::RunTimeouts(float dtime)
1671 Address Connection::GetPeerAddress(u16 peer_id)
1673 JMutexAutoLock peerlock(m_peers_mutex);
1674 return getPeer(peer_id)->address;
1677 float Connection::GetPeerAvgRTT(u16 peer_id)
1679 JMutexAutoLock peerlock(m_peers_mutex);
1680 return getPeer(peer_id)->avg_rtt;
1683 void Connection::DeletePeer(u16 peer_id)
1685 ConnectionCommand c;
1686 c.deletePeer(peer_id);
1690 void Connection::PrintInfo(std::ostream &out)
1692 out<<getDesc()<<": ";
1695 void Connection::PrintInfo()
1697 PrintInfo(dout_con);
1700 std::string Connection::getDesc()
1702 return std::string("con(")+itos(m_socket.GetHandle())+"/"+itos(m_peer_id)+")";