3 Copyright (C) 2010 celeron55, Perttu Ahola <celeron55@gmail.com>
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU Lesser General Public License as published by
7 the Free Software Foundation; either version 2.1 of the License, or
8 (at your option) any later version.
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU Lesser General Public License for more details.
15 You should have received a copy of the GNU Lesser General Public License along
16 with this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 #include "connection.h"
22 #include "serialization.h"
25 #include "util/serialize.h"
26 #include "util/numeric.h"
27 #include "util/string.h"
33 static u16 readPeerId(u8 *packetdata)
35 return readU16(&packetdata[4]);
37 static u8 readChannel(u8 *packetdata)
39 return readU8(&packetdata[6]);
42 BufferedPacket makePacket(Address &address, u8 *data, u32 datasize,
43 u32 protocol_id, u16 sender_peer_id, u8 channel)
45 u32 packet_size = datasize + BASE_HEADER_SIZE;
46 BufferedPacket p(packet_size);
49 writeU32(&p.data[0], protocol_id);
50 writeU16(&p.data[4], sender_peer_id);
51 writeU8(&p.data[6], channel);
53 memcpy(&p.data[BASE_HEADER_SIZE], data, datasize);
58 BufferedPacket makePacket(Address &address, SharedBuffer<u8> &data,
59 u32 protocol_id, u16 sender_peer_id, u8 channel)
61 return makePacket(address, *data, data.getSize(),
62 protocol_id, sender_peer_id, channel);
65 SharedBuffer<u8> makeOriginalPacket(
66 SharedBuffer<u8> data)
69 u32 packet_size = data.getSize() + header_size;
70 SharedBuffer<u8> b(packet_size);
72 writeU8(&b[0], TYPE_ORIGINAL);
74 memcpy(&b[header_size], *data, data.getSize());
79 core::list<SharedBuffer<u8> > makeSplitPacket(
80 SharedBuffer<u8> data,
84 // Chunk packets, containing the TYPE_SPLIT header
85 core::list<SharedBuffer<u8> > chunks;
87 u32 chunk_header_size = 7;
88 u32 maximum_data_size = chunksize_max - chunk_header_size;
93 end = start + maximum_data_size - 1;
94 if(end > data.getSize() - 1)
95 end = data.getSize() - 1;
97 u32 payload_size = end - start + 1;
98 u32 packet_size = chunk_header_size + payload_size;
100 SharedBuffer<u8> chunk(packet_size);
102 writeU8(&chunk[0], TYPE_SPLIT);
103 writeU16(&chunk[1], seqnum);
104 // [3] u16 chunk_count is written at next stage
105 writeU16(&chunk[5], chunk_num);
106 memcpy(&chunk[chunk_header_size], &data[start], payload_size);
108 chunks.push_back(chunk);
113 while(end != data.getSize() - 1);
115 u16 chunk_count = chunks.getSize();
117 core::list<SharedBuffer<u8> >::Iterator i = chunks.begin();
118 for(; i != chunks.end(); i++)
121 writeU16(&((*i)[3]), chunk_count);
127 core::list<SharedBuffer<u8> > makeAutoSplitPacket(
128 SharedBuffer<u8> data,
132 u32 original_header_size = 1;
133 core::list<SharedBuffer<u8> > list;
134 if(data.getSize() + original_header_size > chunksize_max)
136 list = makeSplitPacket(data, chunksize_max, split_seqnum);
142 list.push_back(makeOriginalPacket(data));
147 SharedBuffer<u8> makeReliablePacket(
148 SharedBuffer<u8> data,
151 /*dstream<<"BEGIN SharedBuffer<u8> makeReliablePacket()"<<std::endl;
152 dstream<<"data.getSize()="<<data.getSize()<<", data[0]="
153 <<((unsigned int)data[0]&0xff)<<std::endl;*/
155 u32 packet_size = data.getSize() + header_size;
156 SharedBuffer<u8> b(packet_size);
158 writeU8(&b[0], TYPE_RELIABLE);
159 writeU16(&b[1], seqnum);
161 memcpy(&b[header_size], *data, data.getSize());
163 /*dstream<<"data.getSize()="<<data.getSize()<<", data[0]="
164 <<((unsigned int)data[0]&0xff)<<std::endl;*/
165 //dstream<<"END SharedBuffer<u8> makeReliablePacket()"<<std::endl;
173 void ReliablePacketBuffer::print()
175 core::list<BufferedPacket>::Iterator i;
177 for(; i != m_list.end(); i++)
179 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
183 bool ReliablePacketBuffer::empty()
185 return m_list.empty();
187 u32 ReliablePacketBuffer::size()
189 return m_list.getSize();
191 RPBSearchResult ReliablePacketBuffer::findPacket(u16 seqnum)
193 core::list<BufferedPacket>::Iterator i;
195 for(; i != m_list.end(); i++)
197 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
198 /*dout_con<<"findPacket(): finding seqnum="<<seqnum
199 <<", comparing to s="<<s<<std::endl;*/
205 RPBSearchResult ReliablePacketBuffer::notFound()
209 u16 ReliablePacketBuffer::getFirstSeqnum()
212 throw NotFoundException("Buffer is empty");
213 BufferedPacket p = *m_list.begin();
214 return readU16(&p.data[BASE_HEADER_SIZE+1]);
216 BufferedPacket ReliablePacketBuffer::popFirst()
219 throw NotFoundException("Buffer is empty");
220 BufferedPacket p = *m_list.begin();
221 core::list<BufferedPacket>::Iterator i = m_list.begin();
225 BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum)
227 RPBSearchResult r = findPacket(seqnum);
229 dout_con<<"Not found"<<std::endl;
230 throw NotFoundException("seqnum not found in buffer");
232 BufferedPacket p = *r;
236 void ReliablePacketBuffer::insert(BufferedPacket &p)
238 assert(p.data.getSize() >= BASE_HEADER_SIZE+3);
239 u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
240 assert(type == TYPE_RELIABLE);
241 u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
243 // Find the right place for the packet and insert it there
245 // If list is empty, just add it
252 // Otherwise find the right place
253 core::list<BufferedPacket>::Iterator i;
255 // Find the first packet in the list which has a higher seqnum
256 for(; i != m_list.end(); i++){
257 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
259 throw AlreadyExistsException("Same seqnum in list");
261 if(seqnum_higher(s, seqnum)){
265 // If we're at the end of the list, add the packet to the
267 if(i == m_list.end())
274 m_list.insert_before(i, p);
277 void ReliablePacketBuffer::incrementTimeouts(float dtime)
279 core::list<BufferedPacket>::Iterator i;
281 for(; i != m_list.end(); i++){
283 i->totaltime += dtime;
287 void ReliablePacketBuffer::resetTimedOuts(float timeout)
289 core::list<BufferedPacket>::Iterator i;
291 for(; i != m_list.end(); i++){
292 if(i->time >= timeout)
297 bool ReliablePacketBuffer::anyTotaltimeReached(float timeout)
299 core::list<BufferedPacket>::Iterator i;
301 for(; i != m_list.end(); i++){
302 if(i->totaltime >= timeout)
308 core::list<BufferedPacket> ReliablePacketBuffer::getTimedOuts(float timeout)
310 core::list<BufferedPacket> timed_outs;
311 core::list<BufferedPacket>::Iterator i;
313 for(; i != m_list.end(); i++)
315 if(i->time >= timeout)
316 timed_outs.push_back(*i);
325 IncomingSplitBuffer::~IncomingSplitBuffer()
327 core::map<u16, IncomingSplitPacket*>::Iterator i;
328 i = m_buf.getIterator();
329 for(; i.atEnd() == false; i++)
331 delete i.getNode()->getValue();
335 This will throw a GotSplitPacketException when a full
336 split packet is constructed.
338 SharedBuffer<u8> IncomingSplitBuffer::insert(BufferedPacket &p, bool reliable)
340 u32 headersize = BASE_HEADER_SIZE + 7;
341 assert(p.data.getSize() >= headersize);
342 u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
343 assert(type == TYPE_SPLIT);
344 u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
345 u16 chunk_count = readU16(&p.data[BASE_HEADER_SIZE+3]);
346 u16 chunk_num = readU16(&p.data[BASE_HEADER_SIZE+5]);
348 // Add if doesn't exist
349 if(m_buf.find(seqnum) == NULL)
351 IncomingSplitPacket *sp = new IncomingSplitPacket();
352 sp->chunk_count = chunk_count;
353 sp->reliable = reliable;
357 IncomingSplitPacket *sp = m_buf[seqnum];
359 // TODO: These errors should be thrown or something? Dunno.
360 if(chunk_count != sp->chunk_count)
361 derr_con<<"Connection: WARNING: chunk_count="<<chunk_count
362 <<" != sp->chunk_count="<<sp->chunk_count
364 if(reliable != sp->reliable)
365 derr_con<<"Connection: WARNING: reliable="<<reliable
366 <<" != sp->reliable="<<sp->reliable
369 // If chunk already exists, ignore it.
370 // Sometimes two identical packets may arrive when there is network
371 // lag and the server re-sends stuff.
372 if(sp->chunks.find(chunk_num) != NULL)
373 return SharedBuffer<u8>();
375 // Cut chunk data out of packet
376 u32 chunkdatasize = p.data.getSize() - headersize;
377 SharedBuffer<u8> chunkdata(chunkdatasize);
378 memcpy(*chunkdata, &(p.data[headersize]), chunkdatasize);
380 // Set chunk data in buffer
381 sp->chunks[chunk_num] = chunkdata;
383 // If not all chunks are received, return empty buffer
384 if(sp->allReceived() == false)
385 return SharedBuffer<u8>();
387 // Calculate total size
389 core::map<u16, SharedBuffer<u8> >::Iterator i;
390 i = sp->chunks.getIterator();
391 for(; i.atEnd() == false; i++)
393 totalsize += i.getNode()->getValue().getSize();
396 SharedBuffer<u8> fulldata(totalsize);
398 // Copy chunks to data buffer
400 for(u32 chunk_i=0; chunk_i<sp->chunk_count;
403 SharedBuffer<u8> buf = sp->chunks[chunk_i];
404 u16 chunkdatasize = buf.getSize();
405 memcpy(&fulldata[start], *buf, chunkdatasize);
406 start += chunkdatasize;;
409 // Remove sp from buffer
410 m_buf.remove(seqnum);
415 void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout)
417 core::list<u16> remove_queue;
418 core::map<u16, IncomingSplitPacket*>::Iterator i;
419 i = m_buf.getIterator();
420 for(; i.atEnd() == false; i++)
422 IncomingSplitPacket *p = i.getNode()->getValue();
423 // Reliable ones are not removed by timeout
424 if(p->reliable == true)
427 if(p->time >= timeout)
428 remove_queue.push_back(i.getNode()->getKey());
430 core::list<u16>::Iterator j;
431 j = remove_queue.begin();
432 for(; j != remove_queue.end(); j++)
434 dout_con<<"NOTE: Removing timed out unreliable split packet"
447 next_outgoing_seqnum = SEQNUM_INITIAL;
448 next_incoming_seqnum = SEQNUM_INITIAL;
449 next_outgoing_split_seqnum = SEQNUM_INITIAL;
459 Peer::Peer(u16 a_id, Address a_address):
462 timeout_counter(0.0),
466 has_sent_with_id(false),
468 m_max_packets_per_second(10),
471 congestion_control_aim_rtt(0.2),
472 congestion_control_max_rate(400),
473 congestion_control_min_rate(10)
480 void Peer::reportRTT(float rtt)
484 if(m_max_packets_per_second < congestion_control_max_rate)
485 m_max_packets_per_second += 10;
486 } else if(rtt < congestion_control_aim_rtt){
487 if(m_max_packets_per_second < congestion_control_max_rate)
488 m_max_packets_per_second += 2;
490 m_max_packets_per_second *= 0.8;
491 if(m_max_packets_per_second < congestion_control_min_rate)
492 m_max_packets_per_second = congestion_control_min_rate;
498 else if(avg_rtt < 0.0)
501 avg_rtt = rtt * 0.1 + avg_rtt * 0.9;
503 // Calculate resend_timeout
505 /*int reliable_count = 0;
506 for(int i=0; i<CHANNEL_COUNT; i++)
508 reliable_count += channels[i].outgoing_reliables.size();
510 float timeout = avg_rtt * RESEND_TIMEOUT_FACTOR
511 * ((float)reliable_count * 1);*/
513 float timeout = avg_rtt * RESEND_TIMEOUT_FACTOR;
514 if(timeout < RESEND_TIMEOUT_MIN)
515 timeout = RESEND_TIMEOUT_MIN;
516 if(timeout > RESEND_TIMEOUT_MAX)
517 timeout = RESEND_TIMEOUT_MAX;
518 resend_timeout = timeout;
525 Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout):
526 m_protocol_id(protocol_id),
527 m_max_packet_size(max_packet_size),
530 m_bc_peerhandler(NULL),
531 m_bc_receive_timeout(0),
534 m_socket.setTimeoutMs(5);
539 Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout,
540 PeerHandler *peerhandler):
541 m_protocol_id(protocol_id),
542 m_max_packet_size(max_packet_size),
545 m_bc_peerhandler(peerhandler),
546 m_bc_receive_timeout(0),
549 m_socket.setTimeoutMs(5);
555 Connection::~Connection()
559 for(core::map<u16, Peer*>::Iterator
560 j = m_peers.getIterator();
561 j.atEnd() == false; j++)
563 Peer *peer = j.getNode()->getValue();
570 void * Connection::Thread()
573 log_register_thread("Connection");
575 dout_con<<"Connection thread started"<<std::endl;
577 u32 curtime = porting::getTimeMs();
578 u32 lasttime = curtime;
582 BEGIN_DEBUG_EXCEPTION_HANDLER
585 curtime = porting::getTimeMs();
586 float dtime = (float)(curtime - lasttime) / 1000.;
594 while(m_command_queue.size() != 0){
595 ConnectionCommand c = m_command_queue.pop_front();
603 END_DEBUG_EXCEPTION_HANDLER(derr_con);
609 void Connection::putEvent(ConnectionEvent &e)
611 assert(e.type != CONNEVENT_NONE);
612 m_event_queue.push_back(e);
615 void Connection::processCommand(ConnectionCommand &c)
619 dout_con<<getDesc()<<" processing CONNCMD_NONE"<<std::endl;
622 dout_con<<getDesc()<<" processing CONNCMD_SERVE port="
626 case CONNCMD_CONNECT:
627 dout_con<<getDesc()<<" processing CONNCMD_CONNECT"<<std::endl;
630 case CONNCMD_DISCONNECT:
631 dout_con<<getDesc()<<" processing CONNCMD_DISCONNECT"<<std::endl;
635 dout_con<<getDesc()<<" processing CONNCMD_SEND"<<std::endl;
636 send(c.peer_id, c.channelnum, c.data, c.reliable);
638 case CONNCMD_SEND_TO_ALL:
639 dout_con<<getDesc()<<" processing CONNCMD_SEND_TO_ALL"<<std::endl;
640 sendToAll(c.channelnum, c.data, c.reliable);
642 case CONNCMD_DELETE_PEER:
643 dout_con<<getDesc()<<" processing CONNCMD_DELETE_PEER"<<std::endl;
644 deletePeer(c.peer_id, false);
649 void Connection::send(float dtime)
651 for(core::map<u16, Peer*>::Iterator
652 j = m_peers.getIterator();
653 j.atEnd() == false; j++)
655 Peer *peer = j.getNode()->getValue();
656 peer->m_sendtime_accu += dtime;
657 peer->m_num_sent = 0;
658 peer->m_max_num_sent = peer->m_sendtime_accu *
659 peer->m_max_packets_per_second;
661 Queue<OutgoingPacket> postponed_packets;
662 while(m_outgoing_queue.size() != 0){
663 OutgoingPacket packet = m_outgoing_queue.pop_front();
664 Peer *peer = getPeerNoEx(packet.peer_id);
667 if(peer->channels[packet.channelnum].outgoing_reliables.size() >= 5){
668 postponed_packets.push_back(packet);
669 } else if(peer->m_num_sent < peer->m_max_num_sent){
670 rawSendAsPacket(packet.peer_id, packet.channelnum,
671 packet.data, packet.reliable);
674 postponed_packets.push_back(packet);
677 while(postponed_packets.size() != 0){
678 m_outgoing_queue.push_back(postponed_packets.pop_front());
680 for(core::map<u16, Peer*>::Iterator
681 j = m_peers.getIterator();
682 j.atEnd() == false; j++)
684 Peer *peer = j.getNode()->getValue();
685 peer->m_sendtime_accu -= (float)peer->m_num_sent /
686 peer->m_max_packets_per_second;
687 if(peer->m_sendtime_accu > 10. / peer->m_max_packets_per_second)
688 peer->m_sendtime_accu = 10. / peer->m_max_packets_per_second;
692 // Receive packets from the network and buffers and create ConnectionEvents
693 void Connection::receive()
695 u32 datasize = m_max_packet_size * 2; // Double it just to be safe
696 // TODO: We can not know how many layers of header there are.
697 // For now, just assume there are no other than the base headers.
698 u32 packet_maxsize = datasize + BASE_HEADER_SIZE;
699 SharedBuffer<u8> packetdata(packet_maxsize);
701 bool single_wait_done = false;
706 /* Check if some buffer has relevant data */
709 SharedBuffer<u8> resultdata;
710 bool got = getFromBuffers(peer_id, resultdata);
713 e.dataReceived(peer_id, resultdata);
719 if(single_wait_done){
720 if(m_socket.WaitData(0) == false)
724 single_wait_done = true;
727 s32 received_size = m_socket.Receive(sender, *packetdata, packet_maxsize);
729 if(received_size < 0)
731 if(received_size < BASE_HEADER_SIZE)
733 if(readU32(&packetdata[0]) != m_protocol_id)
736 u16 peer_id = readPeerId(*packetdata);
737 u8 channelnum = readChannel(*packetdata);
738 if(channelnum > CHANNEL_COUNT-1){
740 derr_con<<"Receive(): Invalid channel "<<channelnum<<std::endl;
741 throw InvalidIncomingDataException("Channel doesn't exist");
744 if(peer_id == PEER_ID_INEXISTENT)
747 Somebody is trying to send stuff to us with no peer id.
749 Check if the same address and port was added to our peer
751 Allow only entries that have has_sent_with_id==false.
754 core::map<u16, Peer*>::Iterator j;
755 j = m_peers.getIterator();
756 for(; j.atEnd() == false; j++)
758 Peer *peer = j.getNode()->getValue();
759 if(peer->has_sent_with_id)
761 if(peer->address == sender)
766 If no peer was found with the same address and port,
767 we shall assume it is a new peer and create an entry.
771 // Pass on to adding the peer
773 // Else: A peer was found.
776 Peer *peer = j.getNode()->getValue();
779 derr_con<<"WARNING: Assuming unknown peer to be "
780 <<"peer_id="<<peer_id<<std::endl;
785 The peer was not found in our lists. Add it.
787 if(peer_id == PEER_ID_INEXISTENT)
789 // Somebody wants to make a new connection
791 // Get a unique peer id (2 or higher)
794 Find an unused peer id
796 bool out_of_ids = false;
800 if(m_peers.find(peer_id_new) == NULL)
802 // Check for overflow
803 if(peer_id_new == 65535){
810 errorstream<<getDesc()<<" ran out of peer ids"<<std::endl;
815 dout_con<<"Receive(): Got a packet with peer_id=PEER_ID_INEXISTENT,"
816 " giving peer_id="<<peer_id_new<<std::endl;
819 Peer *peer = new Peer(peer_id_new, sender);
820 m_peers.insert(peer->id, peer);
822 // Create peer addition event
824 e.peerAdded(peer_id_new, sender);
827 // Create CONTROL packet to tell the peer id to the new peer.
828 SharedBuffer<u8> reply(4);
829 writeU8(&reply[0], TYPE_CONTROL);
830 writeU8(&reply[1], CONTROLTYPE_SET_PEER_ID);
831 writeU16(&reply[2], peer_id_new);
832 sendAsPacket(peer_id_new, 0, reply, true);
834 // We're now talking to a valid peer_id
835 peer_id = peer_id_new;
837 // Go on and process whatever it sent
840 core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
845 // This means that the peer id of the sender is not PEER_ID_INEXISTENT
846 // and it is invalid.
848 derr_con<<"Receive(): Peer not found"<<std::endl;
849 throw InvalidIncomingDataException("Peer not found (possible timeout)");
852 Peer *peer = node->getValue();
854 // Validate peer address
855 if(peer->address != sender)
858 derr_con<<"Peer "<<peer_id<<" sending from different address."
859 " Ignoring."<<std::endl;
863 peer->timeout_counter = 0.0;
865 Channel *channel = &(peer->channels[channelnum]);
867 // Throw the received packet to channel->processPacket()
869 // Make a new SharedBuffer from the data without the base headers
870 SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE);
871 memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE],
872 strippeddata.getSize());
875 // Process it (the result is some data with no headers made by us)
876 SharedBuffer<u8> resultdata = processPacket
877 (channel, strippeddata, peer_id, channelnum, false);
880 dout_con<<"ProcessPacket returned data of size "
881 <<resultdata.getSize()<<std::endl;
884 e.dataReceived(peer_id, resultdata);
887 }catch(ProcessedSilentlyException &e){
889 }catch(InvalidIncomingDataException &e){
891 catch(ProcessedSilentlyException &e){
896 void Connection::runTimeouts(float dtime)
898 float congestion_control_aim_rtt
899 = g_settings->getFloat("congestion_control_aim_rtt");
900 float congestion_control_max_rate
901 = g_settings->getFloat("congestion_control_max_rate");
902 float congestion_control_min_rate
903 = g_settings->getFloat("congestion_control_min_rate");
905 core::list<u16> timeouted_peers;
906 core::map<u16, Peer*>::Iterator j;
907 j = m_peers.getIterator();
908 for(; j.atEnd() == false; j++)
910 Peer *peer = j.getNode()->getValue();
912 // Update congestion control values
913 peer->congestion_control_aim_rtt = congestion_control_aim_rtt;
914 peer->congestion_control_max_rate = congestion_control_max_rate;
915 peer->congestion_control_min_rate = congestion_control_min_rate;
920 peer->timeout_counter += dtime;
921 if(peer->timeout_counter > m_timeout)
924 derr_con<<"RunTimeouts(): Peer "<<peer->id
926 <<" (source=peer->timeout_counter)"
928 // Add peer to the list
929 timeouted_peers.push_back(peer->id);
930 // Don't bother going through the buffers of this one
934 float resend_timeout = peer->resend_timeout;
935 for(u16 i=0; i<CHANNEL_COUNT; i++)
937 core::list<BufferedPacket> timed_outs;
938 core::list<BufferedPacket>::Iterator j;
940 Channel *channel = &peer->channels[i];
942 // Remove timed out incomplete unreliable split packets
943 channel->incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout);
945 // Increment reliable packet times
946 channel->outgoing_reliables.incrementTimeouts(dtime);
948 // Check reliable packet total times, remove peer if
950 if(channel->outgoing_reliables.anyTotaltimeReached(m_timeout))
953 derr_con<<"RunTimeouts(): Peer "<<peer->id
955 <<" (source=reliable packet totaltime)"
957 // Add peer to the to-be-removed list
958 timeouted_peers.push_back(peer->id);
962 // Re-send timed out outgoing reliables
964 timed_outs = channel->
965 outgoing_reliables.getTimedOuts(resend_timeout);
967 channel->outgoing_reliables.resetTimedOuts(resend_timeout);
969 j = timed_outs.begin();
970 for(; j != timed_outs.end(); j++)
972 u16 peer_id = readPeerId(*(j->data));
973 u8 channel = readChannel(*(j->data));
974 u16 seqnum = readU16(&(j->data[BASE_HEADER_SIZE+1]));
977 derr_con<<"RE-SENDING timed-out RELIABLE to ";
978 j->address.print(&derr_con);
979 derr_con<<"(t/o="<<resend_timeout<<"): "
980 <<"from_peer_id="<<peer_id
981 <<", channel="<<((int)channel&0xff)
982 <<", seqnum="<<seqnum
987 // Enlarge avg_rtt and resend_timeout:
988 // The rtt will be at least the timeout.
989 // NOTE: This won't affect the timeout of the next
990 // checked channel because it was cached.
991 peer->reportRTT(resend_timeout);
998 peer->ping_timer += dtime;
999 if(peer->ping_timer >= 5.0)
1001 // Create and send PING packet
1002 SharedBuffer<u8> data(2);
1003 writeU8(&data[0], TYPE_CONTROL);
1004 writeU8(&data[1], CONTROLTYPE_PING);
1005 rawSendAsPacket(peer->id, 0, data, true);
1007 peer->ping_timer = 0.0;
1014 // Remove timed out peers
1015 core::list<u16>::Iterator i = timeouted_peers.begin();
1016 for(; i != timeouted_peers.end(); i++)
1018 PrintInfo(derr_con);
1019 derr_con<<"RunTimeouts(): Removing peer "<<(*i)<<std::endl;
1020 deletePeer(*i, true);
1024 void Connection::serve(u16 port)
1026 dout_con<<getDesc()<<" serving at port "<<port<<std::endl;
1028 m_socket.Bind(port);
1029 m_peer_id = PEER_ID_SERVER;
1031 catch(SocketException &e){
1039 void Connection::connect(Address address)
1041 dout_con<<getDesc()<<" connecting to "<<address.serializeString()
1042 <<":"<<address.getPort()<<std::endl;
1044 core::map<u16, Peer*>::Node *node = m_peers.find(PEER_ID_SERVER);
1046 throw ConnectionException("Already connected to a server");
1049 Peer *peer = new Peer(PEER_ID_SERVER, address);
1050 m_peers.insert(peer->id, peer);
1054 e.peerAdded(peer->id, peer->address);
1059 // Send a dummy packet to server with peer_id = PEER_ID_INEXISTENT
1060 m_peer_id = PEER_ID_INEXISTENT;
1061 SharedBuffer<u8> data(0);
1062 Send(PEER_ID_SERVER, 0, data, true);
1065 void Connection::disconnect()
1067 dout_con<<getDesc()<<" disconnecting"<<std::endl;
1069 // Create and send DISCO packet
1070 SharedBuffer<u8> data(2);
1071 writeU8(&data[0], TYPE_CONTROL);
1072 writeU8(&data[1], CONTROLTYPE_DISCO);
1075 core::map<u16, Peer*>::Iterator j;
1076 j = m_peers.getIterator();
1077 for(; j.atEnd() == false; j++)
1079 Peer *peer = j.getNode()->getValue();
1080 rawSendAsPacket(peer->id, 0, data, false);
1084 void Connection::sendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable)
1086 core::map<u16, Peer*>::Iterator j;
1087 j = m_peers.getIterator();
1088 for(; j.atEnd() == false; j++)
1090 Peer *peer = j.getNode()->getValue();
1091 send(peer->id, channelnum, data, reliable);
1095 void Connection::send(u16 peer_id, u8 channelnum,
1096 SharedBuffer<u8> data, bool reliable)
1098 dout_con<<getDesc()<<" sending to peer_id="<<peer_id<<std::endl;
1100 assert(channelnum < CHANNEL_COUNT);
1102 Peer *peer = getPeerNoEx(peer_id);
1105 Channel *channel = &(peer->channels[channelnum]);
1107 u32 chunksize_max = m_max_packet_size - BASE_HEADER_SIZE;
1109 chunksize_max -= RELIABLE_HEADER_SIZE;
1111 core::list<SharedBuffer<u8> > originals;
1112 originals = makeAutoSplitPacket(data, chunksize_max,
1113 channel->next_outgoing_split_seqnum);
1115 core::list<SharedBuffer<u8> >::Iterator i;
1116 i = originals.begin();
1117 for(; i != originals.end(); i++)
1119 SharedBuffer<u8> original = *i;
1121 sendAsPacket(peer_id, channelnum, original, reliable);
1125 void Connection::sendAsPacket(u16 peer_id, u8 channelnum,
1126 SharedBuffer<u8> data, bool reliable)
1128 OutgoingPacket packet(peer_id, channelnum, data, reliable);
1129 m_outgoing_queue.push_back(packet);
1132 void Connection::rawSendAsPacket(u16 peer_id, u8 channelnum,
1133 SharedBuffer<u8> data, bool reliable)
1135 Peer *peer = getPeerNoEx(peer_id);
1138 Channel *channel = &(peer->channels[channelnum]);
1142 u16 seqnum = channel->next_outgoing_seqnum;
1143 channel->next_outgoing_seqnum++;
1145 SharedBuffer<u8> reliable = makeReliablePacket(data, seqnum);
1147 // Add base headers and make a packet
1148 BufferedPacket p = makePacket(peer->address, reliable,
1149 m_protocol_id, m_peer_id, channelnum);
1152 // Buffer the packet
1153 channel->outgoing_reliables.insert(p);
1155 catch(AlreadyExistsException &e)
1157 PrintInfo(derr_con);
1158 derr_con<<"WARNING: Going to send a reliable packet "
1159 "seqnum="<<seqnum<<" that is already "
1160 "in outgoing buffer"<<std::endl;
1169 // Add base headers and make a packet
1170 BufferedPacket p = makePacket(peer->address, data,
1171 m_protocol_id, m_peer_id, channelnum);
1178 void Connection::rawSend(const BufferedPacket &packet)
1181 m_socket.Send(packet.address, *packet.data, packet.data.getSize());
1182 } catch(SendFailedException &e){
1183 derr_con<<"Connection::rawSend(): SendFailedException: "
1184 <<packet.address.serializeString()<<std::endl;
1188 Peer* Connection::getPeer(u16 peer_id)
1190 core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
1193 throw PeerNotFoundException("GetPeer: Peer not found (possible timeout)");
1197 assert(node->getValue()->id == peer_id);
1199 return node->getValue();
1202 Peer* Connection::getPeerNoEx(u16 peer_id)
1204 core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
1211 assert(node->getValue()->id == peer_id);
1213 return node->getValue();
1216 core::list<Peer*> Connection::getPeers()
1218 core::list<Peer*> list;
1219 core::map<u16, Peer*>::Iterator j;
1220 j = m_peers.getIterator();
1221 for(; j.atEnd() == false; j++)
1223 Peer *peer = j.getNode()->getValue();
1224 list.push_back(peer);
1229 bool Connection::getFromBuffers(u16 &peer_id, SharedBuffer<u8> &dst)
1231 core::map<u16, Peer*>::Iterator j;
1232 j = m_peers.getIterator();
1233 for(; j.atEnd() == false; j++)
1235 Peer *peer = j.getNode()->getValue();
1236 for(u16 i=0; i<CHANNEL_COUNT; i++)
1238 Channel *channel = &peer->channels[i];
1239 SharedBuffer<u8> resultdata;
1240 bool got = checkIncomingBuffers(channel, peer_id, resultdata);
1250 bool Connection::checkIncomingBuffers(Channel *channel, u16 &peer_id,
1251 SharedBuffer<u8> &dst)
1253 u16 firstseqnum = 0;
1254 // Clear old packets from start of buffer
1257 firstseqnum = channel->incoming_reliables.getFirstSeqnum();
1258 if(seqnum_higher(channel->next_incoming_seqnum, firstseqnum))
1259 channel->incoming_reliables.popFirst();
1263 // This happens if all packets are old
1264 }catch(con::NotFoundException)
1267 if(channel->incoming_reliables.empty() == false)
1269 if(firstseqnum == channel->next_incoming_seqnum)
1271 BufferedPacket p = channel->incoming_reliables.popFirst();
1273 peer_id = readPeerId(*p.data);
1274 u8 channelnum = readChannel(*p.data);
1275 u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
1278 dout_con<<"UNBUFFERING TYPE_RELIABLE"
1279 <<" seqnum="<<seqnum
1280 <<" peer_id="<<peer_id
1281 <<" channel="<<((int)channelnum&0xff)
1284 channel->next_incoming_seqnum++;
1286 u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
1287 // Get out the inside packet and re-process it
1288 SharedBuffer<u8> payload(p.data.getSize() - headers_size);
1289 memcpy(*payload, &p.data[headers_size], payload.getSize());
1291 dst = processPacket(channel, payload, peer_id, channelnum, true);
1298 SharedBuffer<u8> Connection::processPacket(Channel *channel,
1299 SharedBuffer<u8> packetdata, u16 peer_id,
1300 u8 channelnum, bool reliable)
1302 IndentationRaiser iraiser(&(m_indentation));
1304 if(packetdata.getSize() < 1)
1305 throw InvalidIncomingDataException("packetdata.getSize() < 1");
1307 u8 type = readU8(&packetdata[0]);
1309 if(type == TYPE_CONTROL)
1311 if(packetdata.getSize() < 2)
1312 throw InvalidIncomingDataException("packetdata.getSize() < 2");
1314 u8 controltype = readU8(&packetdata[1]);
1316 if(controltype == CONTROLTYPE_ACK)
1318 if(packetdata.getSize() < 4)
1319 throw InvalidIncomingDataException
1320 ("packetdata.getSize() < 4 (ACK header size)");
1322 u16 seqnum = readU16(&packetdata[2]);
1324 dout_con<<"Got CONTROLTYPE_ACK: channelnum="
1325 <<((int)channelnum&0xff)<<", peer_id="<<peer_id
1326 <<", seqnum="<<seqnum<<std::endl;
1329 BufferedPacket p = channel->outgoing_reliables.popSeqnum(seqnum);
1330 // Get round trip time
1331 float rtt = p.totaltime;
1333 // Let peer calculate stuff according to it
1334 // (avg_rtt and resend_timeout)
1335 Peer *peer = getPeer(peer_id);
1336 peer->reportRTT(rtt);
1338 //PrintInfo(dout_con);
1339 //dout_con<<"RTT = "<<rtt<<std::endl;
1341 /*dout_con<<"OUTGOING: ";
1343 channel->outgoing_reliables.print();
1344 dout_con<<std::endl;*/
1346 catch(NotFoundException &e){
1347 PrintInfo(derr_con);
1348 derr_con<<"WARNING: ACKed packet not "
1353 throw ProcessedSilentlyException("Got an ACK");
1355 else if(controltype == CONTROLTYPE_SET_PEER_ID)
1357 if(packetdata.getSize() < 4)
1358 throw InvalidIncomingDataException
1359 ("packetdata.getSize() < 4 (SET_PEER_ID header size)");
1360 u16 peer_id_new = readU16(&packetdata[2]);
1362 dout_con<<"Got new peer id: "<<peer_id_new<<"... "<<std::endl;
1364 if(GetPeerID() != PEER_ID_INEXISTENT)
1366 PrintInfo(derr_con);
1367 derr_con<<"WARNING: Not changing"
1368 " existing peer id."<<std::endl;
1372 dout_con<<"changing."<<std::endl;
1373 SetPeerID(peer_id_new);
1375 throw ProcessedSilentlyException("Got a SET_PEER_ID");
1377 else if(controltype == CONTROLTYPE_PING)
1379 // Just ignore it, the incoming data already reset
1380 // the timeout counter
1382 dout_con<<"PING"<<std::endl;
1383 throw ProcessedSilentlyException("Got a PING");
1385 else if(controltype == CONTROLTYPE_DISCO)
1387 // Just ignore it, the incoming data already reset
1388 // the timeout counter
1390 dout_con<<"DISCO: Removing peer "<<(peer_id)<<std::endl;
1392 if(deletePeer(peer_id, false) == false)
1394 PrintInfo(derr_con);
1395 derr_con<<"DISCO: Peer not found"<<std::endl;
1398 throw ProcessedSilentlyException("Got a DISCO");
1401 PrintInfo(derr_con);
1402 derr_con<<"INVALID TYPE_CONTROL: invalid controltype="
1403 <<((int)controltype&0xff)<<std::endl;
1404 throw InvalidIncomingDataException("Invalid control type");
1407 else if(type == TYPE_ORIGINAL)
1409 if(packetdata.getSize() < ORIGINAL_HEADER_SIZE)
1410 throw InvalidIncomingDataException
1411 ("packetdata.getSize() < ORIGINAL_HEADER_SIZE");
1413 dout_con<<"RETURNING TYPE_ORIGINAL to user"
1415 // Get the inside packet out and return it
1416 SharedBuffer<u8> payload(packetdata.getSize() - ORIGINAL_HEADER_SIZE);
1417 memcpy(*payload, &packetdata[ORIGINAL_HEADER_SIZE], payload.getSize());
1420 else if(type == TYPE_SPLIT)
1422 // We have to create a packet again for buffering
1423 // This isn't actually too bad an idea.
1424 BufferedPacket packet = makePacket(
1425 getPeer(peer_id)->address,
1430 // Buffer the packet
1431 SharedBuffer<u8> data = channel->incoming_splits.insert(packet, reliable);
1432 if(data.getSize() != 0)
1435 dout_con<<"RETURNING TYPE_SPLIT: Constructed full data, "
1436 <<"size="<<data.getSize()<<std::endl;
1440 dout_con<<"BUFFERED TYPE_SPLIT"<<std::endl;
1441 throw ProcessedSilentlyException("Buffered a split packet chunk");
1443 else if(type == TYPE_RELIABLE)
1445 // Recursive reliable packets not allowed
1446 assert(reliable == false);
1448 if(packetdata.getSize() < RELIABLE_HEADER_SIZE)
1449 throw InvalidIncomingDataException
1450 ("packetdata.getSize() < RELIABLE_HEADER_SIZE");
1452 u16 seqnum = readU16(&packetdata[1]);
1454 bool is_future_packet = seqnum_higher(seqnum, channel->next_incoming_seqnum);
1455 bool is_old_packet = seqnum_higher(channel->next_incoming_seqnum, seqnum);
1458 if(is_future_packet)
1459 dout_con<<"BUFFERING";
1460 else if(is_old_packet)
1464 dout_con<<" TYPE_RELIABLE seqnum="<<seqnum
1465 <<" next="<<channel->next_incoming_seqnum;
1466 dout_con<<" [sending CONTROLTYPE_ACK"
1467 " to peer_id="<<peer_id<<"]";
1468 dout_con<<std::endl;
1471 //assert(channel->incoming_reliables.size() < 100);
1473 // Send a CONTROLTYPE_ACK
1474 SharedBuffer<u8> reply(4);
1475 writeU8(&reply[0], TYPE_CONTROL);
1476 writeU8(&reply[1], CONTROLTYPE_ACK);
1477 writeU16(&reply[2], seqnum);
1478 rawSendAsPacket(peer_id, channelnum, reply, false);
1480 //if(seqnum_higher(seqnum, channel->next_incoming_seqnum))
1481 if(is_future_packet)
1484 dout_con<<"Buffering reliable packet (seqnum="
1485 <<seqnum<<")"<<std::endl;*/
1487 // This one comes later, buffer it.
1488 // Actually we have to make a packet to buffer one.
1489 // Well, we have all the ingredients, so just do it.
1490 BufferedPacket packet = makePacket(
1491 getPeer(peer_id)->address,
1497 channel->incoming_reliables.insert(packet);
1500 dout_con<<"INCOMING: ";
1501 channel->incoming_reliables.print();
1502 dout_con<<std::endl;*/
1504 catch(AlreadyExistsException &e)
1508 throw ProcessedSilentlyException("Buffered future reliable packet");
1510 //else if(seqnum_higher(channel->next_incoming_seqnum, seqnum))
1511 else if(is_old_packet)
1513 // An old packet, dump it
1514 throw InvalidIncomingDataException("Got an old reliable packet");
1517 channel->next_incoming_seqnum++;
1519 // Get out the inside packet and re-process it
1520 SharedBuffer<u8> payload(packetdata.getSize() - RELIABLE_HEADER_SIZE);
1521 memcpy(*payload, &packetdata[RELIABLE_HEADER_SIZE], payload.getSize());
1523 return processPacket(channel, payload, peer_id, channelnum, true);
1527 PrintInfo(derr_con);
1528 derr_con<<"Got invalid type="<<((int)type&0xff)<<std::endl;
1529 throw InvalidIncomingDataException("Invalid packet type");
1532 // We should never get here.
1533 // If you get here, add an exception or a return to some of the
1534 // above conditionals.
1536 throw BaseException("Error in Channel::ProcessPacket()");
1539 bool Connection::deletePeer(u16 peer_id, bool timeout)
1541 if(m_peers.find(peer_id) == NULL)
1544 Peer *peer = m_peers[peer_id];
1548 e.peerRemoved(peer_id, timeout, peer->address);
1551 delete m_peers[peer_id];
1552 m_peers.remove(peer_id);
1558 ConnectionEvent Connection::getEvent()
1560 if(m_event_queue.size() == 0){
1562 e.type = CONNEVENT_NONE;
1565 return m_event_queue.pop_front();
1568 ConnectionEvent Connection::waitEvent(u32 timeout_ms)
1571 return m_event_queue.pop_front(timeout_ms);
1572 } catch(ItemNotFoundException &ex){
1574 e.type = CONNEVENT_NONE;
1579 void Connection::putCommand(ConnectionCommand &c)
1581 m_command_queue.push_back(c);
1584 void Connection::Serve(unsigned short port)
1586 ConnectionCommand c;
1591 void Connection::Connect(Address address)
1593 ConnectionCommand c;
1598 bool Connection::Connected()
1600 JMutexAutoLock peerlock(m_peers_mutex);
1602 if(m_peers.size() != 1)
1605 core::map<u16, Peer*>::Node *node = m_peers.find(PEER_ID_SERVER);
1609 if(m_peer_id == PEER_ID_INEXISTENT)
1615 void Connection::Disconnect()
1617 ConnectionCommand c;
1622 u32 Connection::Receive(u16 &peer_id, SharedBuffer<u8> &data)
1625 ConnectionEvent e = waitEvent(m_bc_receive_timeout);
1626 if(e.type != CONNEVENT_NONE)
1627 dout_con<<getDesc()<<": Receive: got event: "
1628 <<e.describe()<<std::endl;
1630 case CONNEVENT_NONE:
1631 throw NoIncomingDataException("No incoming data");
1632 case CONNEVENT_DATA_RECEIVED:
1633 peer_id = e.peer_id;
1634 data = SharedBuffer<u8>(e.data);
1635 return e.data.getSize();
1636 case CONNEVENT_PEER_ADDED: {
1637 Peer tmp(e.peer_id, e.address);
1638 if(m_bc_peerhandler)
1639 m_bc_peerhandler->peerAdded(&tmp);
1641 case CONNEVENT_PEER_REMOVED: {
1642 Peer tmp(e.peer_id, e.address);
1643 if(m_bc_peerhandler)
1644 m_bc_peerhandler->deletingPeer(&tmp, e.timeout);
1646 case CONNEVENT_BIND_FAILED:
1647 throw ConnectionBindFailed("Failed to bind socket "
1648 "(port already in use?)");
1651 throw NoIncomingDataException("No incoming data");
1654 void Connection::SendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable)
1656 assert(channelnum < CHANNEL_COUNT);
1658 ConnectionCommand c;
1659 c.sendToAll(channelnum, data, reliable);
1663 void Connection::Send(u16 peer_id, u8 channelnum,
1664 SharedBuffer<u8> data, bool reliable)
1666 assert(channelnum < CHANNEL_COUNT);
1668 ConnectionCommand c;
1669 c.send(peer_id, channelnum, data, reliable);
1673 void Connection::RunTimeouts(float dtime)
1678 Address Connection::GetPeerAddress(u16 peer_id)
1680 JMutexAutoLock peerlock(m_peers_mutex);
1681 return getPeer(peer_id)->address;
1684 float Connection::GetPeerAvgRTT(u16 peer_id)
1686 JMutexAutoLock peerlock(m_peers_mutex);
1687 return getPeer(peer_id)->avg_rtt;
1690 void Connection::DeletePeer(u16 peer_id)
1692 ConnectionCommand c;
1693 c.deletePeer(peer_id);
1697 void Connection::PrintInfo(std::ostream &out)
1699 out<<getDesc()<<": ";
1702 void Connection::PrintInfo()
1704 PrintInfo(dout_con);
1707 std::string Connection::getDesc()
1709 return std::string("con(")+itos(m_socket.GetHandle())+"/"+itos(m_peer_id)+")";