3 Copyright (C) 2010 celeron55, Perttu Ahola <celeron55@gmail.com>
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License along
16 with this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 #include "connection.h"
22 #include "serialization.h"
29 BufferedPacket makePacket(Address &address, u8 *data, u32 datasize,
30 u32 protocol_id, u16 sender_peer_id, u8 channel)
32 u32 packet_size = datasize + BASE_HEADER_SIZE;
33 BufferedPacket p(packet_size);
36 writeU32(&p.data[0], protocol_id);
37 writeU16(&p.data[4], sender_peer_id);
38 writeU8(&p.data[6], channel);
40 memcpy(&p.data[BASE_HEADER_SIZE], data, datasize);
45 BufferedPacket makePacket(Address &address, SharedBuffer<u8> &data,
46 u32 protocol_id, u16 sender_peer_id, u8 channel)
48 return makePacket(address, *data, data.getSize(),
49 protocol_id, sender_peer_id, channel);
52 SharedBuffer<u8> makeOriginalPacket(
53 SharedBuffer<u8> data)
56 u32 packet_size = data.getSize() + header_size;
57 SharedBuffer<u8> b(packet_size);
59 writeU8(&b[0], TYPE_ORIGINAL);
61 memcpy(&b[header_size], *data, data.getSize());
66 core::list<SharedBuffer<u8> > makeSplitPacket(
67 SharedBuffer<u8> data,
71 // Chunk packets, containing the TYPE_SPLIT header
72 core::list<SharedBuffer<u8> > chunks;
74 u32 chunk_header_size = 7;
75 u32 maximum_data_size = chunksize_max - chunk_header_size;
80 end = start + maximum_data_size - 1;
81 if(end > data.getSize() - 1)
82 end = data.getSize() - 1;
84 u32 payload_size = end - start + 1;
85 u32 packet_size = chunk_header_size + payload_size;
87 SharedBuffer<u8> chunk(packet_size);
89 writeU8(&chunk[0], TYPE_SPLIT);
90 writeU16(&chunk[1], seqnum);
91 // [3] u16 chunk_count is written at next stage
92 writeU16(&chunk[5], chunk_num);
93 memcpy(&chunk[chunk_header_size], &data[start], payload_size);
95 chunks.push_back(chunk);
100 while(end != data.getSize() - 1);
102 u16 chunk_count = chunks.getSize();
104 core::list<SharedBuffer<u8> >::Iterator i = chunks.begin();
105 for(; i != chunks.end(); i++)
108 writeU16(&((*i)[3]), chunk_count);
114 core::list<SharedBuffer<u8> > makeAutoSplitPacket(
115 SharedBuffer<u8> data,
119 u32 original_header_size = 1;
120 core::list<SharedBuffer<u8> > list;
121 if(data.getSize() + original_header_size > chunksize_max)
123 list = makeSplitPacket(data, chunksize_max, split_seqnum);
129 list.push_back(makeOriginalPacket(data));
134 SharedBuffer<u8> makeReliablePacket(
135 SharedBuffer<u8> data,
138 /*dstream<<"BEGIN SharedBuffer<u8> makeReliablePacket()"<<std::endl;
139 dstream<<"data.getSize()="<<data.getSize()<<", data[0]="
140 <<((unsigned int)data[0]&0xff)<<std::endl;*/
142 u32 packet_size = data.getSize() + header_size;
143 SharedBuffer<u8> b(packet_size);
145 writeU8(&b[0], TYPE_RELIABLE);
146 writeU16(&b[1], seqnum);
148 memcpy(&b[header_size], *data, data.getSize());
150 /*dstream<<"data.getSize()="<<data.getSize()<<", data[0]="
151 <<((unsigned int)data[0]&0xff)<<std::endl;*/
152 //dstream<<"END SharedBuffer<u8> makeReliablePacket()"<<std::endl;
160 void ReliablePacketBuffer::print()
162 core::list<BufferedPacket>::Iterator i;
164 for(; i != m_list.end(); i++)
166 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
170 bool ReliablePacketBuffer::empty()
172 return m_list.empty();
174 u32 ReliablePacketBuffer::size()
176 return m_list.getSize();
178 RPBSearchResult ReliablePacketBuffer::findPacket(u16 seqnum)
180 core::list<BufferedPacket>::Iterator i;
182 for(; i != m_list.end(); i++)
184 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
185 /*dout_con<<"findPacket(): finding seqnum="<<seqnum
186 <<", comparing to s="<<s<<std::endl;*/
192 RPBSearchResult ReliablePacketBuffer::notFound()
196 u16 ReliablePacketBuffer::getFirstSeqnum()
199 throw NotFoundException("Buffer is empty");
200 BufferedPacket p = *m_list.begin();
201 return readU16(&p.data[BASE_HEADER_SIZE+1]);
203 BufferedPacket ReliablePacketBuffer::popFirst()
206 throw NotFoundException("Buffer is empty");
207 BufferedPacket p = *m_list.begin();
208 core::list<BufferedPacket>::Iterator i = m_list.begin();
212 BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum)
214 RPBSearchResult r = findPacket(seqnum);
216 dout_con<<"Not found"<<std::endl;
217 throw NotFoundException("seqnum not found in buffer");
219 BufferedPacket p = *r;
223 void ReliablePacketBuffer::insert(BufferedPacket &p)
225 assert(p.data.getSize() >= BASE_HEADER_SIZE+3);
226 u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
227 assert(type == TYPE_RELIABLE);
228 u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
230 // Find the right place for the packet and insert it there
232 // If list is empty, just add it
239 // Otherwise find the right place
240 core::list<BufferedPacket>::Iterator i;
242 // Find the first packet in the list which has a higher seqnum
243 for(; i != m_list.end(); i++){
244 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
246 throw AlreadyExistsException("Same seqnum in list");
248 if(seqnum_higher(s, seqnum)){
252 // If we're at the end of the list, add the packet to the
254 if(i == m_list.end())
261 m_list.insert_before(i, p);
264 void ReliablePacketBuffer::incrementTimeouts(float dtime)
266 core::list<BufferedPacket>::Iterator i;
268 for(; i != m_list.end(); i++){
270 i->totaltime += dtime;
274 void ReliablePacketBuffer::resetTimedOuts(float timeout)
276 core::list<BufferedPacket>::Iterator i;
278 for(; i != m_list.end(); i++){
279 if(i->time >= timeout)
284 bool ReliablePacketBuffer::anyTotaltimeReached(float timeout)
286 core::list<BufferedPacket>::Iterator i;
288 for(; i != m_list.end(); i++){
289 if(i->totaltime >= timeout)
295 core::list<BufferedPacket> ReliablePacketBuffer::getTimedOuts(float timeout)
297 core::list<BufferedPacket> timed_outs;
298 core::list<BufferedPacket>::Iterator i;
300 for(; i != m_list.end(); i++)
302 if(i->time >= timeout)
303 timed_outs.push_back(*i);
312 IncomingSplitBuffer::~IncomingSplitBuffer()
314 core::map<u16, IncomingSplitPacket*>::Iterator i;
315 i = m_buf.getIterator();
316 for(; i.atEnd() == false; i++)
318 delete i.getNode()->getValue();
322 This will throw a GotSplitPacketException when a full
323 split packet is constructed.
325 SharedBuffer<u8> IncomingSplitBuffer::insert(BufferedPacket &p, bool reliable)
327 u32 headersize = BASE_HEADER_SIZE + 7;
328 assert(p.data.getSize() >= headersize);
329 u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
330 assert(type == TYPE_SPLIT);
331 u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
332 u16 chunk_count = readU16(&p.data[BASE_HEADER_SIZE+3]);
333 u16 chunk_num = readU16(&p.data[BASE_HEADER_SIZE+5]);
335 // Add if doesn't exist
336 if(m_buf.find(seqnum) == NULL)
338 IncomingSplitPacket *sp = new IncomingSplitPacket();
339 sp->chunk_count = chunk_count;
340 sp->reliable = reliable;
344 IncomingSplitPacket *sp = m_buf[seqnum];
346 // TODO: These errors should be thrown or something? Dunno.
347 if(chunk_count != sp->chunk_count)
348 derr_con<<"Connection: WARNING: chunk_count="<<chunk_count
349 <<" != sp->chunk_count="<<sp->chunk_count
351 if(reliable != sp->reliable)
352 derr_con<<"Connection: WARNING: reliable="<<reliable
353 <<" != sp->reliable="<<sp->reliable
356 // If chunk already exists, cancel
357 if(sp->chunks.find(chunk_num) != NULL)
358 throw AlreadyExistsException("Chunk already in buffer");
360 // Cut chunk data out of packet
361 u32 chunkdatasize = p.data.getSize() - headersize;
362 SharedBuffer<u8> chunkdata(chunkdatasize);
363 memcpy(*chunkdata, &(p.data[headersize]), chunkdatasize);
365 // Set chunk data in buffer
366 sp->chunks[chunk_num] = chunkdata;
368 // If not all chunks are received, return empty buffer
369 if(sp->allReceived() == false)
370 return SharedBuffer<u8>();
372 // Calculate total size
374 core::map<u16, SharedBuffer<u8> >::Iterator i;
375 i = sp->chunks.getIterator();
376 for(; i.atEnd() == false; i++)
378 totalsize += i.getNode()->getValue().getSize();
381 SharedBuffer<u8> fulldata(totalsize);
383 // Copy chunks to data buffer
385 for(u32 chunk_i=0; chunk_i<sp->chunk_count;
388 SharedBuffer<u8> buf = sp->chunks[chunk_i];
389 u16 chunkdatasize = buf.getSize();
390 memcpy(&fulldata[start], *buf, chunkdatasize);
391 start += chunkdatasize;;
394 // Remove sp from buffer
395 m_buf.remove(seqnum);
400 void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout)
402 core::list<u16> remove_queue;
403 core::map<u16, IncomingSplitPacket*>::Iterator i;
404 i = m_buf.getIterator();
405 for(; i.atEnd() == false; i++)
407 IncomingSplitPacket *p = i.getNode()->getValue();
408 // Reliable ones are not removed by timeout
409 if(p->reliable == true)
412 if(p->time >= timeout)
413 remove_queue.push_back(i.getNode()->getKey());
415 core::list<u16>::Iterator j;
416 j = remove_queue.begin();
417 for(; j != remove_queue.end(); j++)
419 dout_con<<"NOTE: Removing timed out unreliable split packet"
432 next_outgoing_seqnum = SEQNUM_INITIAL;
433 next_incoming_seqnum = SEQNUM_INITIAL;
434 next_outgoing_split_seqnum = SEQNUM_INITIAL;
444 Peer::Peer(u16 a_id, Address a_address):
447 timeout_counter(0.0),
451 has_sent_with_id(false),
453 m_max_packets_per_second(10),
462 void Peer::reportRTT(float rtt)
466 if(m_max_packets_per_second < 100)
467 m_max_packets_per_second += 10;
468 } else if(rtt < 0.2){
469 if(m_max_packets_per_second < 100)
470 m_max_packets_per_second += 2;
472 if(m_max_packets_per_second > 5)
473 m_max_packets_per_second *= 0.5;
479 else if(avg_rtt < 0.0)
482 avg_rtt = rtt * 0.1 + avg_rtt * 0.9;
484 // Calculate resend_timeout
486 /*int reliable_count = 0;
487 for(int i=0; i<CHANNEL_COUNT; i++)
489 reliable_count += channels[i].outgoing_reliables.size();
491 float timeout = avg_rtt * RESEND_TIMEOUT_FACTOR
492 * ((float)reliable_count * 1);*/
494 float timeout = avg_rtt * RESEND_TIMEOUT_FACTOR;
495 if(timeout < RESEND_TIMEOUT_MIN)
496 timeout = RESEND_TIMEOUT_MIN;
497 if(timeout > RESEND_TIMEOUT_MAX)
498 timeout = RESEND_TIMEOUT_MAX;
499 resend_timeout = timeout;
506 Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout):
507 m_protocol_id(protocol_id),
508 m_max_packet_size(max_packet_size),
511 m_bc_peerhandler(NULL),
512 m_bc_receive_timeout(0),
515 m_socket.setTimeoutMs(5);
520 Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout,
521 PeerHandler *peerhandler):
522 m_protocol_id(protocol_id),
523 m_max_packet_size(max_packet_size),
526 m_bc_peerhandler(peerhandler),
527 m_bc_receive_timeout(0),
530 m_socket.setTimeoutMs(5);
536 Connection::~Connection()
543 void * Connection::Thread()
546 log_register_thread("Connection");
548 dout_con<<"Connection thread started"<<std::endl;
550 u32 curtime = porting::getTimeMs();
551 u32 lasttime = curtime;
555 BEGIN_DEBUG_EXCEPTION_HANDLER
558 curtime = porting::getTimeMs();
559 float dtime = (float)(curtime - lasttime) / 1000.;
567 while(m_command_queue.size() != 0){
568 ConnectionCommand c = m_command_queue.pop_front();
576 END_DEBUG_EXCEPTION_HANDLER(derr_con);
582 void Connection::putEvent(ConnectionEvent &e)
584 assert(e.type != CONNEVENT_NONE);
585 m_event_queue.push_back(e);
588 void Connection::processCommand(ConnectionCommand &c)
592 dout_con<<getDesc()<<" processing CONNCMD_NONE"<<std::endl;
595 dout_con<<getDesc()<<" processing CONNCMD_SERVE port="
599 case CONNCMD_CONNECT:
600 dout_con<<getDesc()<<" processing CONNCMD_CONNECT"<<std::endl;
603 case CONNCMD_DISCONNECT:
604 dout_con<<getDesc()<<" processing CONNCMD_DISCONNECT"<<std::endl;
608 dout_con<<getDesc()<<" processing CONNCMD_SEND"<<std::endl;
609 send(c.peer_id, c.channelnum, c.data, c.reliable);
611 case CONNCMD_SEND_TO_ALL:
612 dout_con<<getDesc()<<" processing CONNCMD_SEND_TO_ALL"<<std::endl;
613 sendToAll(c.channelnum, c.data, c.reliable);
615 case CONNCMD_DELETE_PEER:
616 dout_con<<getDesc()<<" processing CONNCMD_DELETE_PEER"<<std::endl;
617 deletePeer(c.peer_id, false);
622 void Connection::send(float dtime)
624 for(core::map<u16, Peer*>::Iterator
625 j = m_peers.getIterator();
626 j.atEnd() == false; j++)
628 Peer *peer = j.getNode()->getValue();
629 peer->m_sendtime_accu += dtime;
630 peer->m_num_sent = 0;
631 peer->m_max_num_sent = peer->m_sendtime_accu *
632 peer->m_max_packets_per_second;
634 Queue<OutgoingPacket> postponed_packets;
635 while(m_outgoing_queue.size() != 0){
636 OutgoingPacket packet = m_outgoing_queue.pop_front();
637 Peer *peer = getPeerNoEx(packet.peer_id);
640 if(peer->channels[packet.channelnum].outgoing_reliables.size() >= 5){
641 postponed_packets.push_back(packet);
642 } else if(peer->m_num_sent < peer->m_max_num_sent){
643 rawSendAsPacket(packet.peer_id, packet.channelnum,
644 packet.data, packet.reliable);
647 postponed_packets.push_back(packet);
650 while(postponed_packets.size() != 0){
651 m_outgoing_queue.push_back(postponed_packets.pop_front());
653 for(core::map<u16, Peer*>::Iterator
654 j = m_peers.getIterator();
655 j.atEnd() == false; j++)
657 Peer *peer = j.getNode()->getValue();
658 peer->m_sendtime_accu -= (float)peer->m_num_sent /
659 peer->m_max_packets_per_second;
660 if(peer->m_sendtime_accu > 10. / peer->m_max_packets_per_second)
661 peer->m_sendtime_accu = 10. / peer->m_max_packets_per_second;
665 // Receive packets from the network and buffers and create ConnectionEvents
666 void Connection::receive()
668 u32 datasize = 100000;
669 // TODO: We can not know how many layers of header there are.
670 // For now, just assume there are no other than the base headers.
671 u32 packet_maxsize = datasize + BASE_HEADER_SIZE;
672 Buffer<u8> packetdata(packet_maxsize);
674 bool single_wait_done = false;
679 /* Check if some buffer has relevant data */
682 SharedBuffer<u8> resultdata;
683 bool got = getFromBuffers(peer_id, resultdata);
686 e.dataReceived(peer_id, resultdata);
692 if(single_wait_done){
693 if(m_socket.WaitData(0) == false)
697 single_wait_done = true;
700 s32 received_size = m_socket.Receive(sender, *packetdata, packet_maxsize);
702 if(received_size < 0)
704 if(received_size < BASE_HEADER_SIZE)
706 if(readU32(&packetdata[0]) != m_protocol_id)
709 u16 peer_id = readPeerId(*packetdata);
710 u8 channelnum = readChannel(*packetdata);
711 if(channelnum > CHANNEL_COUNT-1){
713 derr_con<<"Receive(): Invalid channel "<<channelnum<<std::endl;
714 throw InvalidIncomingDataException("Channel doesn't exist");
717 if(peer_id == PEER_ID_INEXISTENT)
720 Somebody is trying to send stuff to us with no peer id.
722 Check if the same address and port was added to our peer
724 Allow only entries that have has_sent_with_id==false.
727 core::map<u16, Peer*>::Iterator j;
728 j = m_peers.getIterator();
729 for(; j.atEnd() == false; j++)
731 Peer *peer = j.getNode()->getValue();
732 if(peer->has_sent_with_id)
734 if(peer->address == sender)
739 If no peer was found with the same address and port,
740 we shall assume it is a new peer and create an entry.
744 // Pass on to adding the peer
746 // Else: A peer was found.
749 Peer *peer = j.getNode()->getValue();
752 derr_con<<"WARNING: Assuming unknown peer to be "
753 <<"peer_id="<<peer_id<<std::endl;
758 The peer was not found in our lists. Add it.
760 if(peer_id == PEER_ID_INEXISTENT)
762 // Somebody wants to make a new connection
764 // Get a unique peer id (2 or higher)
767 Find an unused peer id
769 bool out_of_ids = false;
773 if(m_peers.find(peer_id_new) == NULL)
775 // Check for overflow
776 if(peer_id_new == 65535){
783 errorstream<<getDesc()<<" ran out of peer ids"<<std::endl;
788 dout_con<<"Receive(): Got a packet with peer_id=PEER_ID_INEXISTENT,"
789 " giving peer_id="<<peer_id_new<<std::endl;
792 Peer *peer = new Peer(peer_id_new, sender);
793 m_peers.insert(peer->id, peer);
795 // Create peer addition event
797 e.peerAdded(peer_id_new, sender);
800 // Create CONTROL packet to tell the peer id to the new peer.
801 SharedBuffer<u8> reply(4);
802 writeU8(&reply[0], TYPE_CONTROL);
803 writeU8(&reply[1], CONTROLTYPE_SET_PEER_ID);
804 writeU16(&reply[2], peer_id_new);
805 sendAsPacket(peer_id_new, 0, reply, true);
807 // We're now talking to a valid peer_id
808 peer_id = peer_id_new;
810 // Go on and process whatever it sent
813 core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
818 // This means that the peer id of the sender is not PEER_ID_INEXISTENT
819 // and it is invalid.
821 derr_con<<"Receive(): Peer not found"<<std::endl;
822 throw InvalidIncomingDataException("Peer not found (possible timeout)");
825 Peer *peer = node->getValue();
827 // Validate peer address
828 if(peer->address != sender)
831 derr_con<<"Peer "<<peer_id<<" sending from different address."
832 " Ignoring."<<std::endl;
836 peer->timeout_counter = 0.0;
838 Channel *channel = &(peer->channels[channelnum]);
840 // Throw the received packet to channel->processPacket()
842 // Make a new SharedBuffer from the data without the base headers
843 SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE);
844 memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE],
845 strippeddata.getSize());
848 // Process it (the result is some data with no headers made by us)
849 SharedBuffer<u8> resultdata = processPacket
850 (channel, strippeddata, peer_id, channelnum, false);
853 dout_con<<"ProcessPacket returned data of size "
854 <<resultdata.getSize()<<std::endl;
856 if(datasize < resultdata.getSize())
857 throw InvalidIncomingDataException
858 ("Buffer too small for received data");
861 e.dataReceived(peer_id, resultdata);
864 }catch(ProcessedSilentlyException &e){
866 }catch(InvalidIncomingDataException &e){
868 catch(ProcessedSilentlyException &e){
873 void Connection::runTimeouts(float dtime)
875 core::list<u16> timeouted_peers;
876 core::map<u16, Peer*>::Iterator j;
877 j = m_peers.getIterator();
878 for(; j.atEnd() == false; j++)
880 Peer *peer = j.getNode()->getValue();
885 peer->timeout_counter += dtime;
886 if(peer->timeout_counter > m_timeout)
889 derr_con<<"RunTimeouts(): Peer "<<peer->id
891 <<" (source=peer->timeout_counter)"
893 // Add peer to the list
894 timeouted_peers.push_back(peer->id);
895 // Don't bother going through the buffers of this one
899 float resend_timeout = peer->resend_timeout;
900 for(u16 i=0; i<CHANNEL_COUNT; i++)
902 core::list<BufferedPacket> timed_outs;
903 core::list<BufferedPacket>::Iterator j;
905 Channel *channel = &peer->channels[i];
907 // Remove timed out incomplete unreliable split packets
908 channel->incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout);
910 // Increment reliable packet times
911 channel->outgoing_reliables.incrementTimeouts(dtime);
913 // Check reliable packet total times, remove peer if
915 if(channel->outgoing_reliables.anyTotaltimeReached(m_timeout))
918 derr_con<<"RunTimeouts(): Peer "<<peer->id
920 <<" (source=reliable packet totaltime)"
922 // Add peer to the to-be-removed list
923 timeouted_peers.push_back(peer->id);
927 // Re-send timed out outgoing reliables
929 timed_outs = channel->
930 outgoing_reliables.getTimedOuts(resend_timeout);
932 channel->outgoing_reliables.resetTimedOuts(resend_timeout);
934 j = timed_outs.begin();
935 for(; j != timed_outs.end(); j++)
937 u16 peer_id = readPeerId(*(j->data));
938 u8 channel = readChannel(*(j->data));
939 u16 seqnum = readU16(&(j->data[BASE_HEADER_SIZE+1]));
942 derr_con<<"RE-SENDING timed-out RELIABLE to ";
943 j->address.print(&derr_con);
944 derr_con<<"(t/o="<<resend_timeout<<"): "
945 <<"from_peer_id="<<peer_id
946 <<", channel="<<((int)channel&0xff)
947 <<", seqnum="<<seqnum
952 // Enlarge avg_rtt and resend_timeout:
953 // The rtt will be at least the timeout.
954 // NOTE: This won't affect the timeout of the next
955 // checked channel because it was cached.
956 peer->reportRTT(resend_timeout);
963 peer->ping_timer += dtime;
964 if(peer->ping_timer >= 5.0)
966 // Create and send PING packet
967 SharedBuffer<u8> data(2);
968 writeU8(&data[0], TYPE_CONTROL);
969 writeU8(&data[1], CONTROLTYPE_PING);
970 rawSendAsPacket(peer->id, 0, data, true);
972 peer->ping_timer = 0.0;
979 // Remove timed out peers
980 core::list<u16>::Iterator i = timeouted_peers.begin();
981 for(; i != timeouted_peers.end(); i++)
984 derr_con<<"RunTimeouts(): Removing peer "<<(*i)<<std::endl;
985 deletePeer(*i, true);
989 void Connection::serve(u16 port)
991 dout_con<<getDesc()<<" serving at port "<<port<<std::endl;
993 m_peer_id = PEER_ID_SERVER;
996 void Connection::connect(Address address)
998 dout_con<<getDesc()<<" connecting to "<<address.serializeString()
999 <<":"<<address.getPort()<<std::endl;
1001 core::map<u16, Peer*>::Node *node = m_peers.find(PEER_ID_SERVER);
1003 throw ConnectionException("Already connected to a server");
1006 Peer *peer = new Peer(PEER_ID_SERVER, address);
1007 m_peers.insert(peer->id, peer);
1011 e.peerAdded(peer->id, peer->address);
1016 // Send a dummy packet to server with peer_id = PEER_ID_INEXISTENT
1017 m_peer_id = PEER_ID_INEXISTENT;
1018 SharedBuffer<u8> data(0);
1019 Send(PEER_ID_SERVER, 0, data, true);
1022 void Connection::disconnect()
1024 dout_con<<getDesc()<<" disconnecting"<<std::endl;
1026 // Create and send DISCO packet
1027 SharedBuffer<u8> data(2);
1028 writeU8(&data[0], TYPE_CONTROL);
1029 writeU8(&data[1], CONTROLTYPE_DISCO);
1032 core::map<u16, Peer*>::Iterator j;
1033 j = m_peers.getIterator();
1034 for(; j.atEnd() == false; j++)
1036 Peer *peer = j.getNode()->getValue();
1037 rawSendAsPacket(peer->id, 0, data, false);
1041 void Connection::sendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable)
1043 core::map<u16, Peer*>::Iterator j;
1044 j = m_peers.getIterator();
1045 for(; j.atEnd() == false; j++)
1047 Peer *peer = j.getNode()->getValue();
1048 send(peer->id, channelnum, data, reliable);
1052 void Connection::send(u16 peer_id, u8 channelnum,
1053 SharedBuffer<u8> data, bool reliable)
1055 dout_con<<getDesc()<<" sending to peer_id="<<peer_id<<std::endl;
1057 assert(channelnum < CHANNEL_COUNT);
1059 Peer *peer = getPeerNoEx(peer_id);
1062 Channel *channel = &(peer->channels[channelnum]);
1064 u32 chunksize_max = m_max_packet_size - BASE_HEADER_SIZE;
1066 chunksize_max -= RELIABLE_HEADER_SIZE;
1068 core::list<SharedBuffer<u8> > originals;
1069 originals = makeAutoSplitPacket(data, chunksize_max,
1070 channel->next_outgoing_split_seqnum);
1072 core::list<SharedBuffer<u8> >::Iterator i;
1073 i = originals.begin();
1074 for(; i != originals.end(); i++)
1076 SharedBuffer<u8> original = *i;
1078 sendAsPacket(peer_id, channelnum, original, reliable);
1082 void Connection::sendAsPacket(u16 peer_id, u8 channelnum,
1083 SharedBuffer<u8> data, bool reliable)
1085 OutgoingPacket packet(peer_id, channelnum, data, reliable);
1086 m_outgoing_queue.push_back(packet);
1089 void Connection::rawSendAsPacket(u16 peer_id, u8 channelnum,
1090 SharedBuffer<u8> data, bool reliable)
1092 Peer *peer = getPeerNoEx(peer_id);
1095 Channel *channel = &(peer->channels[channelnum]);
1099 u16 seqnum = channel->next_outgoing_seqnum;
1100 channel->next_outgoing_seqnum++;
1102 SharedBuffer<u8> reliable = makeReliablePacket(data, seqnum);
1104 // Add base headers and make a packet
1105 BufferedPacket p = makePacket(peer->address, reliable,
1106 m_protocol_id, m_peer_id, channelnum);
1109 // Buffer the packet
1110 channel->outgoing_reliables.insert(p);
1112 catch(AlreadyExistsException &e)
1114 PrintInfo(derr_con);
1115 derr_con<<"WARNING: Going to send a reliable packet "
1116 "seqnum="<<seqnum<<" that is already "
1117 "in outgoing buffer"<<std::endl;
1126 // Add base headers and make a packet
1127 BufferedPacket p = makePacket(peer->address, data,
1128 m_protocol_id, m_peer_id, channelnum);
1135 void Connection::rawSend(const BufferedPacket &packet)
1138 m_socket.Send(packet.address, *packet.data, packet.data.getSize());
1139 } catch(SendFailedException &e){
1140 derr_con<<"Connection::rawSend(): SendFailedException: "
1141 <<packet.address.serializeString()<<std::endl;
1145 Peer* Connection::getPeer(u16 peer_id)
1147 core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
1150 throw PeerNotFoundException("GetPeer: Peer not found (possible timeout)");
1154 assert(node->getValue()->id == peer_id);
1156 return node->getValue();
1159 Peer* Connection::getPeerNoEx(u16 peer_id)
1161 core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
1168 assert(node->getValue()->id == peer_id);
1170 return node->getValue();
1173 core::list<Peer*> Connection::getPeers()
1175 core::list<Peer*> list;
1176 core::map<u16, Peer*>::Iterator j;
1177 j = m_peers.getIterator();
1178 for(; j.atEnd() == false; j++)
1180 Peer *peer = j.getNode()->getValue();
1181 list.push_back(peer);
1186 bool Connection::getFromBuffers(u16 &peer_id, SharedBuffer<u8> &dst)
1188 core::map<u16, Peer*>::Iterator j;
1189 j = m_peers.getIterator();
1190 for(; j.atEnd() == false; j++)
1192 Peer *peer = j.getNode()->getValue();
1193 for(u16 i=0; i<CHANNEL_COUNT; i++)
1195 Channel *channel = &peer->channels[i];
1196 SharedBuffer<u8> resultdata;
1197 bool got = checkIncomingBuffers(channel, peer_id, resultdata);
1207 bool Connection::checkIncomingBuffers(Channel *channel, u16 &peer_id,
1208 SharedBuffer<u8> &dst)
1210 u16 firstseqnum = 0;
1211 // Clear old packets from start of buffer
1214 firstseqnum = channel->incoming_reliables.getFirstSeqnum();
1215 if(seqnum_higher(channel->next_incoming_seqnum, firstseqnum))
1216 channel->incoming_reliables.popFirst();
1220 // This happens if all packets are old
1221 }catch(con::NotFoundException)
1224 if(channel->incoming_reliables.empty() == false)
1226 if(firstseqnum == channel->next_incoming_seqnum)
1228 BufferedPacket p = channel->incoming_reliables.popFirst();
1230 peer_id = readPeerId(*p.data);
1231 u8 channelnum = readChannel(*p.data);
1232 u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
1235 dout_con<<"UNBUFFERING TYPE_RELIABLE"
1236 <<" seqnum="<<seqnum
1237 <<" peer_id="<<peer_id
1238 <<" channel="<<((int)channelnum&0xff)
1241 channel->next_incoming_seqnum++;
1243 u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
1244 // Get out the inside packet and re-process it
1245 SharedBuffer<u8> payload(p.data.getSize() - headers_size);
1246 memcpy(*payload, &p.data[headers_size], payload.getSize());
1248 dst = processPacket(channel, payload, peer_id, channelnum, true);
1255 SharedBuffer<u8> Connection::processPacket(Channel *channel,
1256 SharedBuffer<u8> packetdata, u16 peer_id,
1257 u8 channelnum, bool reliable)
1259 IndentationRaiser iraiser(&(m_indentation));
1261 if(packetdata.getSize() < 1)
1262 throw InvalidIncomingDataException("packetdata.getSize() < 1");
1264 u8 type = readU8(&packetdata[0]);
1266 if(type == TYPE_CONTROL)
1268 if(packetdata.getSize() < 2)
1269 throw InvalidIncomingDataException("packetdata.getSize() < 2");
1271 u8 controltype = readU8(&packetdata[1]);
1273 if(controltype == CONTROLTYPE_ACK)
1275 if(packetdata.getSize() < 4)
1276 throw InvalidIncomingDataException
1277 ("packetdata.getSize() < 4 (ACK header size)");
1279 u16 seqnum = readU16(&packetdata[2]);
1281 dout_con<<"Got CONTROLTYPE_ACK: channelnum="
1282 <<((int)channelnum&0xff)<<", peer_id="<<peer_id
1283 <<", seqnum="<<seqnum<<std::endl;
1286 BufferedPacket p = channel->outgoing_reliables.popSeqnum(seqnum);
1287 // Get round trip time
1288 float rtt = p.totaltime;
1290 // Let peer calculate stuff according to it
1291 // (avg_rtt and resend_timeout)
1292 Peer *peer = getPeer(peer_id);
1293 peer->reportRTT(rtt);
1295 //PrintInfo(dout_con);
1296 //dout_con<<"RTT = "<<rtt<<std::endl;
1298 /*dout_con<<"OUTGOING: ";
1300 channel->outgoing_reliables.print();
1301 dout_con<<std::endl;*/
1303 catch(NotFoundException &e){
1304 PrintInfo(derr_con);
1305 derr_con<<"WARNING: ACKed packet not "
1310 throw ProcessedSilentlyException("Got an ACK");
1312 else if(controltype == CONTROLTYPE_SET_PEER_ID)
1314 if(packetdata.getSize() < 4)
1315 throw InvalidIncomingDataException
1316 ("packetdata.getSize() < 4 (SET_PEER_ID header size)");
1317 u16 peer_id_new = readU16(&packetdata[2]);
1319 dout_con<<"Got new peer id: "<<peer_id_new<<"... "<<std::endl;
1321 if(GetPeerID() != PEER_ID_INEXISTENT)
1323 PrintInfo(derr_con);
1324 derr_con<<"WARNING: Not changing"
1325 " existing peer id."<<std::endl;
1329 dout_con<<"changing."<<std::endl;
1330 SetPeerID(peer_id_new);
1332 throw ProcessedSilentlyException("Got a SET_PEER_ID");
1334 else if(controltype == CONTROLTYPE_PING)
1336 // Just ignore it, the incoming data already reset
1337 // the timeout counter
1339 dout_con<<"PING"<<std::endl;
1340 throw ProcessedSilentlyException("Got a PING");
1342 else if(controltype == CONTROLTYPE_DISCO)
1344 // Just ignore it, the incoming data already reset
1345 // the timeout counter
1347 dout_con<<"DISCO: Removing peer "<<(peer_id)<<std::endl;
1349 if(deletePeer(peer_id, false) == false)
1351 PrintInfo(derr_con);
1352 derr_con<<"DISCO: Peer not found"<<std::endl;
1355 throw ProcessedSilentlyException("Got a DISCO");
1358 PrintInfo(derr_con);
1359 derr_con<<"INVALID TYPE_CONTROL: invalid controltype="
1360 <<((int)controltype&0xff)<<std::endl;
1361 throw InvalidIncomingDataException("Invalid control type");
1364 else if(type == TYPE_ORIGINAL)
1366 if(packetdata.getSize() < ORIGINAL_HEADER_SIZE)
1367 throw InvalidIncomingDataException
1368 ("packetdata.getSize() < ORIGINAL_HEADER_SIZE");
1370 dout_con<<"RETURNING TYPE_ORIGINAL to user"
1372 // Get the inside packet out and return it
1373 SharedBuffer<u8> payload(packetdata.getSize() - ORIGINAL_HEADER_SIZE);
1374 memcpy(*payload, &packetdata[ORIGINAL_HEADER_SIZE], payload.getSize());
1377 else if(type == TYPE_SPLIT)
1379 // We have to create a packet again for buffering
1380 // This isn't actually too bad an idea.
1381 BufferedPacket packet = makePacket(
1382 getPeer(peer_id)->address,
1387 // Buffer the packet
1388 SharedBuffer<u8> data = channel->incoming_splits.insert(packet, reliable);
1389 if(data.getSize() != 0)
1392 dout_con<<"RETURNING TYPE_SPLIT: Constructed full data, "
1393 <<"size="<<data.getSize()<<std::endl;
1397 dout_con<<"BUFFERED TYPE_SPLIT"<<std::endl;
1398 throw ProcessedSilentlyException("Buffered a split packet chunk");
1400 else if(type == TYPE_RELIABLE)
1402 // Recursive reliable packets not allowed
1403 assert(reliable == false);
1405 if(packetdata.getSize() < RELIABLE_HEADER_SIZE)
1406 throw InvalidIncomingDataException
1407 ("packetdata.getSize() < RELIABLE_HEADER_SIZE");
1409 u16 seqnum = readU16(&packetdata[1]);
1411 bool is_future_packet = seqnum_higher(seqnum, channel->next_incoming_seqnum);
1412 bool is_old_packet = seqnum_higher(channel->next_incoming_seqnum, seqnum);
1415 if(is_future_packet)
1416 dout_con<<"BUFFERING";
1417 else if(is_old_packet)
1421 dout_con<<" TYPE_RELIABLE seqnum="<<seqnum
1422 <<" next="<<channel->next_incoming_seqnum;
1423 dout_con<<" [sending CONTROLTYPE_ACK"
1424 " to peer_id="<<peer_id<<"]";
1425 dout_con<<std::endl;
1428 //assert(channel->incoming_reliables.size() < 100);
1430 // Send a CONTROLTYPE_ACK
1431 SharedBuffer<u8> reply(4);
1432 writeU8(&reply[0], TYPE_CONTROL);
1433 writeU8(&reply[1], CONTROLTYPE_ACK);
1434 writeU16(&reply[2], seqnum);
1435 rawSendAsPacket(peer_id, channelnum, reply, false);
1437 //if(seqnum_higher(seqnum, channel->next_incoming_seqnum))
1438 if(is_future_packet)
1441 dout_con<<"Buffering reliable packet (seqnum="
1442 <<seqnum<<")"<<std::endl;*/
1444 // This one comes later, buffer it.
1445 // Actually we have to make a packet to buffer one.
1446 // Well, we have all the ingredients, so just do it.
1447 BufferedPacket packet = makePacket(
1448 getPeer(peer_id)->address,
1454 channel->incoming_reliables.insert(packet);
1457 dout_con<<"INCOMING: ";
1458 channel->incoming_reliables.print();
1459 dout_con<<std::endl;*/
1461 catch(AlreadyExistsException &e)
1465 throw ProcessedSilentlyException("Buffered future reliable packet");
1467 //else if(seqnum_higher(channel->next_incoming_seqnum, seqnum))
1468 else if(is_old_packet)
1470 // An old packet, dump it
1471 throw InvalidIncomingDataException("Got an old reliable packet");
1474 channel->next_incoming_seqnum++;
1476 // Get out the inside packet and re-process it
1477 SharedBuffer<u8> payload(packetdata.getSize() - RELIABLE_HEADER_SIZE);
1478 memcpy(*payload, &packetdata[RELIABLE_HEADER_SIZE], payload.getSize());
1480 return processPacket(channel, payload, peer_id, channelnum, true);
1484 PrintInfo(derr_con);
1485 derr_con<<"Got invalid type="<<((int)type&0xff)<<std::endl;
1486 throw InvalidIncomingDataException("Invalid packet type");
1489 // We should never get here.
1490 // If you get here, add an exception or a return to some of the
1491 // above conditionals.
1493 throw BaseException("Error in Channel::ProcessPacket()");
1496 bool Connection::deletePeer(u16 peer_id, bool timeout)
1498 if(m_peers.find(peer_id) == NULL)
1501 Peer *peer = m_peers[peer_id];
1505 e.peerRemoved(peer_id, timeout, peer->address);
1508 delete m_peers[peer_id];
1509 m_peers.remove(peer_id);
1515 ConnectionEvent Connection::getEvent()
1517 if(m_event_queue.size() == 0){
1519 e.type = CONNEVENT_NONE;
1522 return m_event_queue.pop_front();
1525 ConnectionEvent Connection::waitEvent(u32 timeout_ms)
1528 return m_event_queue.pop_front(timeout_ms);
1529 } catch(ItemNotFoundException &e){
1531 e.type = CONNEVENT_NONE;
1536 void Connection::putCommand(ConnectionCommand &c)
1538 m_command_queue.push_back(c);
1541 void Connection::Serve(unsigned short port)
1543 ConnectionCommand c;
1548 void Connection::Connect(Address address)
1550 ConnectionCommand c;
1555 bool Connection::Connected()
1557 JMutexAutoLock peerlock(m_peers_mutex);
1559 if(m_peers.size() != 1)
1562 core::map<u16, Peer*>::Node *node = m_peers.find(PEER_ID_SERVER);
1566 if(m_peer_id == PEER_ID_INEXISTENT)
1572 void Connection::Disconnect()
1574 ConnectionCommand c;
1579 u32 Connection::Receive(u16 &peer_id, u8 *data, u32 datasize)
1582 ConnectionEvent e = waitEvent(m_bc_receive_timeout);
1583 if(e.type != CONNEVENT_NONE)
1584 dout_con<<getDesc()<<": Receive: got event: "
1585 <<e.describe()<<std::endl;
1587 case CONNEVENT_NONE:
1588 throw NoIncomingDataException("No incoming data");
1589 case CONNEVENT_DATA_RECEIVED:
1590 peer_id = e.peer_id;
1591 memcpy(data, *e.data, e.data.getSize());
1592 return e.data.getSize();
1593 case CONNEVENT_PEER_ADDED: {
1594 Peer tmp(e.peer_id, e.address);
1595 if(m_bc_peerhandler)
1596 m_bc_peerhandler->peerAdded(&tmp);
1598 case CONNEVENT_PEER_REMOVED: {
1599 Peer tmp(e.peer_id, e.address);
1600 if(m_bc_peerhandler)
1601 m_bc_peerhandler->deletingPeer(&tmp, e.timeout);
1605 throw NoIncomingDataException("No incoming data");
1608 void Connection::SendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable)
1610 assert(channelnum < CHANNEL_COUNT);
1612 ConnectionCommand c;
1613 c.sendToAll(channelnum, data, reliable);
1617 void Connection::Send(u16 peer_id, u8 channelnum,
1618 SharedBuffer<u8> data, bool reliable)
1620 assert(channelnum < CHANNEL_COUNT);
1622 ConnectionCommand c;
1623 c.send(peer_id, channelnum, data, reliable);
1627 void Connection::RunTimeouts(float dtime)
1632 Address Connection::GetPeerAddress(u16 peer_id)
1634 JMutexAutoLock peerlock(m_peers_mutex);
1635 return getPeer(peer_id)->address;
1638 float Connection::GetPeerAvgRTT(u16 peer_id)
1640 JMutexAutoLock peerlock(m_peers_mutex);
1641 return getPeer(peer_id)->avg_rtt;
1644 void Connection::DeletePeer(u16 peer_id)
1646 ConnectionCommand c;
1647 c.deletePeer(peer_id);
1651 void Connection::PrintInfo(std::ostream &out)
1653 out<<getDesc()<<": ";
1656 void Connection::PrintInfo()
1658 PrintInfo(dout_con);
1661 std::string Connection::getDesc()
1663 return std::string("con(")+itos(m_socket.GetHandle())+"/"+itos(m_peer_id)+")";