3 Copyright (C) 2010 celeron55, Perttu Ahola <celeron55@gmail.com>
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License along
16 with this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 #include "connection.h"
22 #include "serialization.h"
29 BufferedPacket makePacket(Address &address, u8 *data, u32 datasize,
30 u32 protocol_id, u16 sender_peer_id, u8 channel)
32 u32 packet_size = datasize + BASE_HEADER_SIZE;
33 BufferedPacket p(packet_size);
36 writeU32(&p.data[0], protocol_id);
37 writeU16(&p.data[4], sender_peer_id);
38 writeU8(&p.data[6], channel);
40 memcpy(&p.data[BASE_HEADER_SIZE], data, datasize);
45 BufferedPacket makePacket(Address &address, SharedBuffer<u8> &data,
46 u32 protocol_id, u16 sender_peer_id, u8 channel)
48 return makePacket(address, *data, data.getSize(),
49 protocol_id, sender_peer_id, channel);
52 SharedBuffer<u8> makeOriginalPacket(
53 SharedBuffer<u8> data)
56 u32 packet_size = data.getSize() + header_size;
57 SharedBuffer<u8> b(packet_size);
59 writeU8(&b[0], TYPE_ORIGINAL);
61 memcpy(&b[header_size], *data, data.getSize());
66 core::list<SharedBuffer<u8> > makeSplitPacket(
67 SharedBuffer<u8> data,
71 // Chunk packets, containing the TYPE_SPLIT header
72 core::list<SharedBuffer<u8> > chunks;
74 u32 chunk_header_size = 7;
75 u32 maximum_data_size = chunksize_max - chunk_header_size;
80 end = start + maximum_data_size - 1;
81 if(end > data.getSize() - 1)
82 end = data.getSize() - 1;
84 u32 payload_size = end - start + 1;
85 u32 packet_size = chunk_header_size + payload_size;
87 SharedBuffer<u8> chunk(packet_size);
89 writeU8(&chunk[0], TYPE_SPLIT);
90 writeU16(&chunk[1], seqnum);
91 // [3] u16 chunk_count is written at next stage
92 writeU16(&chunk[5], chunk_num);
93 memcpy(&chunk[chunk_header_size], &data[start], payload_size);
95 chunks.push_back(chunk);
100 while(end != data.getSize() - 1);
102 u16 chunk_count = chunks.getSize();
104 core::list<SharedBuffer<u8> >::Iterator i = chunks.begin();
105 for(; i != chunks.end(); i++)
108 writeU16(&((*i)[3]), chunk_count);
114 core::list<SharedBuffer<u8> > makeAutoSplitPacket(
115 SharedBuffer<u8> data,
119 u32 original_header_size = 1;
120 core::list<SharedBuffer<u8> > list;
121 if(data.getSize() + original_header_size > chunksize_max)
123 list = makeSplitPacket(data, chunksize_max, split_seqnum);
129 list.push_back(makeOriginalPacket(data));
134 SharedBuffer<u8> makeReliablePacket(
135 SharedBuffer<u8> data,
138 /*dstream<<"BEGIN SharedBuffer<u8> makeReliablePacket()"<<std::endl;
139 dstream<<"data.getSize()="<<data.getSize()<<", data[0]="
140 <<((unsigned int)data[0]&0xff)<<std::endl;*/
142 u32 packet_size = data.getSize() + header_size;
143 SharedBuffer<u8> b(packet_size);
145 writeU8(&b[0], TYPE_RELIABLE);
146 writeU16(&b[1], seqnum);
148 memcpy(&b[header_size], *data, data.getSize());
150 /*dstream<<"data.getSize()="<<data.getSize()<<", data[0]="
151 <<((unsigned int)data[0]&0xff)<<std::endl;*/
152 //dstream<<"END SharedBuffer<u8> makeReliablePacket()"<<std::endl;
160 void ReliablePacketBuffer::print()
162 core::list<BufferedPacket>::Iterator i;
164 for(; i != m_list.end(); i++)
166 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
170 bool ReliablePacketBuffer::empty()
172 return m_list.empty();
174 u32 ReliablePacketBuffer::size()
176 return m_list.getSize();
178 RPBSearchResult ReliablePacketBuffer::findPacket(u16 seqnum)
180 core::list<BufferedPacket>::Iterator i;
182 for(; i != m_list.end(); i++)
184 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
185 /*dout_con<<"findPacket(): finding seqnum="<<seqnum
186 <<", comparing to s="<<s<<std::endl;*/
192 RPBSearchResult ReliablePacketBuffer::notFound()
196 u16 ReliablePacketBuffer::getFirstSeqnum()
199 throw NotFoundException("Buffer is empty");
200 BufferedPacket p = *m_list.begin();
201 return readU16(&p.data[BASE_HEADER_SIZE+1]);
203 BufferedPacket ReliablePacketBuffer::popFirst()
206 throw NotFoundException("Buffer is empty");
207 BufferedPacket p = *m_list.begin();
208 core::list<BufferedPacket>::Iterator i = m_list.begin();
212 BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum)
214 RPBSearchResult r = findPacket(seqnum);
216 dout_con<<"Not found"<<std::endl;
217 throw NotFoundException("seqnum not found in buffer");
219 BufferedPacket p = *r;
223 void ReliablePacketBuffer::insert(BufferedPacket &p)
225 assert(p.data.getSize() >= BASE_HEADER_SIZE+3);
226 u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
227 assert(type == TYPE_RELIABLE);
228 u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
230 // Find the right place for the packet and insert it there
232 // If list is empty, just add it
239 // Otherwise find the right place
240 core::list<BufferedPacket>::Iterator i;
242 // Find the first packet in the list which has a higher seqnum
243 for(; i != m_list.end(); i++){
244 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
246 throw AlreadyExistsException("Same seqnum in list");
248 if(seqnum_higher(s, seqnum)){
252 // If we're at the end of the list, add the packet to the
254 if(i == m_list.end())
261 m_list.insert_before(i, p);
264 void ReliablePacketBuffer::incrementTimeouts(float dtime)
266 core::list<BufferedPacket>::Iterator i;
268 for(; i != m_list.end(); i++){
270 i->totaltime += dtime;
274 void ReliablePacketBuffer::resetTimedOuts(float timeout)
276 core::list<BufferedPacket>::Iterator i;
278 for(; i != m_list.end(); i++){
279 if(i->time >= timeout)
284 bool ReliablePacketBuffer::anyTotaltimeReached(float timeout)
286 core::list<BufferedPacket>::Iterator i;
288 for(; i != m_list.end(); i++){
289 if(i->totaltime >= timeout)
295 core::list<BufferedPacket> ReliablePacketBuffer::getTimedOuts(float timeout)
297 core::list<BufferedPacket> timed_outs;
298 core::list<BufferedPacket>::Iterator i;
300 for(; i != m_list.end(); i++)
302 if(i->time >= timeout)
303 timed_outs.push_back(*i);
312 IncomingSplitBuffer::~IncomingSplitBuffer()
314 core::map<u16, IncomingSplitPacket*>::Iterator i;
315 i = m_buf.getIterator();
316 for(; i.atEnd() == false; i++)
318 delete i.getNode()->getValue();
322 This will throw a GotSplitPacketException when a full
323 split packet is constructed.
325 SharedBuffer<u8> IncomingSplitBuffer::insert(BufferedPacket &p, bool reliable)
327 u32 headersize = BASE_HEADER_SIZE + 7;
328 assert(p.data.getSize() >= headersize);
329 u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
330 assert(type == TYPE_SPLIT);
331 u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
332 u16 chunk_count = readU16(&p.data[BASE_HEADER_SIZE+3]);
333 u16 chunk_num = readU16(&p.data[BASE_HEADER_SIZE+5]);
335 // Add if doesn't exist
336 if(m_buf.find(seqnum) == NULL)
338 IncomingSplitPacket *sp = new IncomingSplitPacket();
339 sp->chunk_count = chunk_count;
340 sp->reliable = reliable;
344 IncomingSplitPacket *sp = m_buf[seqnum];
346 // TODO: These errors should be thrown or something? Dunno.
347 if(chunk_count != sp->chunk_count)
348 derr_con<<"Connection: WARNING: chunk_count="<<chunk_count
349 <<" != sp->chunk_count="<<sp->chunk_count
351 if(reliable != sp->reliable)
352 derr_con<<"Connection: WARNING: reliable="<<reliable
353 <<" != sp->reliable="<<sp->reliable
356 // If chunk already exists, cancel
357 if(sp->chunks.find(chunk_num) != NULL)
358 throw AlreadyExistsException("Chunk already in buffer");
360 // Cut chunk data out of packet
361 u32 chunkdatasize = p.data.getSize() - headersize;
362 SharedBuffer<u8> chunkdata(chunkdatasize);
363 memcpy(*chunkdata, &(p.data[headersize]), chunkdatasize);
365 // Set chunk data in buffer
366 sp->chunks[chunk_num] = chunkdata;
368 // If not all chunks are received, return empty buffer
369 if(sp->allReceived() == false)
370 return SharedBuffer<u8>();
372 // Calculate total size
374 core::map<u16, SharedBuffer<u8> >::Iterator i;
375 i = sp->chunks.getIterator();
376 for(; i.atEnd() == false; i++)
378 totalsize += i.getNode()->getValue().getSize();
381 SharedBuffer<u8> fulldata(totalsize);
383 // Copy chunks to data buffer
385 for(u32 chunk_i=0; chunk_i<sp->chunk_count;
388 SharedBuffer<u8> buf = sp->chunks[chunk_i];
389 u16 chunkdatasize = buf.getSize();
390 memcpy(&fulldata[start], *buf, chunkdatasize);
391 start += chunkdatasize;;
394 // Remove sp from buffer
395 m_buf.remove(seqnum);
400 void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout)
402 core::list<u16> remove_queue;
403 core::map<u16, IncomingSplitPacket*>::Iterator i;
404 i = m_buf.getIterator();
405 for(; i.atEnd() == false; i++)
407 IncomingSplitPacket *p = i.getNode()->getValue();
408 // Reliable ones are not removed by timeout
409 if(p->reliable == true)
412 if(p->time >= timeout)
413 remove_queue.push_back(i.getNode()->getKey());
415 core::list<u16>::Iterator j;
416 j = remove_queue.begin();
417 for(; j != remove_queue.end(); j++)
419 dout_con<<"NOTE: Removing timed out unreliable split packet"
432 next_outgoing_seqnum = SEQNUM_INITIAL;
433 next_incoming_seqnum = SEQNUM_INITIAL;
434 next_outgoing_split_seqnum = SEQNUM_INITIAL;
444 Peer::Peer(u16 a_id, Address a_address):
447 timeout_counter(0.0),
451 has_sent_with_id(false),
453 m_max_packets_per_second(10),
462 void Peer::reportRTT(float rtt)
466 if(m_max_packets_per_second < 100)
467 m_max_packets_per_second += 10;
468 } else if(rtt < 0.2){
469 if(m_max_packets_per_second < 100)
470 m_max_packets_per_second += 2;
472 m_max_packets_per_second *= 0.8;
473 if(m_max_packets_per_second < 10)
474 m_max_packets_per_second = 10;
480 else if(avg_rtt < 0.0)
483 avg_rtt = rtt * 0.1 + avg_rtt * 0.9;
485 // Calculate resend_timeout
487 /*int reliable_count = 0;
488 for(int i=0; i<CHANNEL_COUNT; i++)
490 reliable_count += channels[i].outgoing_reliables.size();
492 float timeout = avg_rtt * RESEND_TIMEOUT_FACTOR
493 * ((float)reliable_count * 1);*/
495 float timeout = avg_rtt * RESEND_TIMEOUT_FACTOR;
496 if(timeout < RESEND_TIMEOUT_MIN)
497 timeout = RESEND_TIMEOUT_MIN;
498 if(timeout > RESEND_TIMEOUT_MAX)
499 timeout = RESEND_TIMEOUT_MAX;
500 resend_timeout = timeout;
507 Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout):
508 m_protocol_id(protocol_id),
509 m_max_packet_size(max_packet_size),
512 m_bc_peerhandler(NULL),
513 m_bc_receive_timeout(0),
516 m_socket.setTimeoutMs(5);
521 Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout,
522 PeerHandler *peerhandler):
523 m_protocol_id(protocol_id),
524 m_max_packet_size(max_packet_size),
527 m_bc_peerhandler(peerhandler),
528 m_bc_receive_timeout(0),
531 m_socket.setTimeoutMs(5);
537 Connection::~Connection()
544 void * Connection::Thread()
547 log_register_thread("Connection");
549 dout_con<<"Connection thread started"<<std::endl;
551 u32 curtime = porting::getTimeMs();
552 u32 lasttime = curtime;
556 BEGIN_DEBUG_EXCEPTION_HANDLER
559 curtime = porting::getTimeMs();
560 float dtime = (float)(curtime - lasttime) / 1000.;
568 while(m_command_queue.size() != 0){
569 ConnectionCommand c = m_command_queue.pop_front();
577 END_DEBUG_EXCEPTION_HANDLER(derr_con);
583 void Connection::putEvent(ConnectionEvent &e)
585 assert(e.type != CONNEVENT_NONE);
586 m_event_queue.push_back(e);
589 void Connection::processCommand(ConnectionCommand &c)
593 dout_con<<getDesc()<<" processing CONNCMD_NONE"<<std::endl;
596 dout_con<<getDesc()<<" processing CONNCMD_SERVE port="
600 case CONNCMD_CONNECT:
601 dout_con<<getDesc()<<" processing CONNCMD_CONNECT"<<std::endl;
604 case CONNCMD_DISCONNECT:
605 dout_con<<getDesc()<<" processing CONNCMD_DISCONNECT"<<std::endl;
609 dout_con<<getDesc()<<" processing CONNCMD_SEND"<<std::endl;
610 send(c.peer_id, c.channelnum, c.data, c.reliable);
612 case CONNCMD_SEND_TO_ALL:
613 dout_con<<getDesc()<<" processing CONNCMD_SEND_TO_ALL"<<std::endl;
614 sendToAll(c.channelnum, c.data, c.reliable);
616 case CONNCMD_DELETE_PEER:
617 dout_con<<getDesc()<<" processing CONNCMD_DELETE_PEER"<<std::endl;
618 deletePeer(c.peer_id, false);
623 void Connection::send(float dtime)
625 for(core::map<u16, Peer*>::Iterator
626 j = m_peers.getIterator();
627 j.atEnd() == false; j++)
629 Peer *peer = j.getNode()->getValue();
630 peer->m_sendtime_accu += dtime;
631 peer->m_num_sent = 0;
632 peer->m_max_num_sent = peer->m_sendtime_accu *
633 peer->m_max_packets_per_second;
635 Queue<OutgoingPacket> postponed_packets;
636 while(m_outgoing_queue.size() != 0){
637 OutgoingPacket packet = m_outgoing_queue.pop_front();
638 Peer *peer = getPeerNoEx(packet.peer_id);
641 if(peer->channels[packet.channelnum].outgoing_reliables.size() >= 5){
642 postponed_packets.push_back(packet);
643 } else if(peer->m_num_sent < peer->m_max_num_sent){
644 rawSendAsPacket(packet.peer_id, packet.channelnum,
645 packet.data, packet.reliable);
648 postponed_packets.push_back(packet);
651 while(postponed_packets.size() != 0){
652 m_outgoing_queue.push_back(postponed_packets.pop_front());
654 for(core::map<u16, Peer*>::Iterator
655 j = m_peers.getIterator();
656 j.atEnd() == false; j++)
658 Peer *peer = j.getNode()->getValue();
659 peer->m_sendtime_accu -= (float)peer->m_num_sent /
660 peer->m_max_packets_per_second;
661 if(peer->m_sendtime_accu > 10. / peer->m_max_packets_per_second)
662 peer->m_sendtime_accu = 10. / peer->m_max_packets_per_second;
666 // Receive packets from the network and buffers and create ConnectionEvents
667 void Connection::receive()
669 u32 datasize = 100000;
670 // TODO: We can not know how many layers of header there are.
671 // For now, just assume there are no other than the base headers.
672 u32 packet_maxsize = datasize + BASE_HEADER_SIZE;
673 Buffer<u8> packetdata(packet_maxsize);
675 bool single_wait_done = false;
680 /* Check if some buffer has relevant data */
683 SharedBuffer<u8> resultdata;
684 bool got = getFromBuffers(peer_id, resultdata);
687 e.dataReceived(peer_id, resultdata);
693 if(single_wait_done){
694 if(m_socket.WaitData(0) == false)
698 single_wait_done = true;
701 s32 received_size = m_socket.Receive(sender, *packetdata, packet_maxsize);
703 if(received_size < 0)
705 if(received_size < BASE_HEADER_SIZE)
707 if(readU32(&packetdata[0]) != m_protocol_id)
710 u16 peer_id = readPeerId(*packetdata);
711 u8 channelnum = readChannel(*packetdata);
712 if(channelnum > CHANNEL_COUNT-1){
714 derr_con<<"Receive(): Invalid channel "<<channelnum<<std::endl;
715 throw InvalidIncomingDataException("Channel doesn't exist");
718 if(peer_id == PEER_ID_INEXISTENT)
721 Somebody is trying to send stuff to us with no peer id.
723 Check if the same address and port was added to our peer
725 Allow only entries that have has_sent_with_id==false.
728 core::map<u16, Peer*>::Iterator j;
729 j = m_peers.getIterator();
730 for(; j.atEnd() == false; j++)
732 Peer *peer = j.getNode()->getValue();
733 if(peer->has_sent_with_id)
735 if(peer->address == sender)
740 If no peer was found with the same address and port,
741 we shall assume it is a new peer and create an entry.
745 // Pass on to adding the peer
747 // Else: A peer was found.
750 Peer *peer = j.getNode()->getValue();
753 derr_con<<"WARNING: Assuming unknown peer to be "
754 <<"peer_id="<<peer_id<<std::endl;
759 The peer was not found in our lists. Add it.
761 if(peer_id == PEER_ID_INEXISTENT)
763 // Somebody wants to make a new connection
765 // Get a unique peer id (2 or higher)
768 Find an unused peer id
770 bool out_of_ids = false;
774 if(m_peers.find(peer_id_new) == NULL)
776 // Check for overflow
777 if(peer_id_new == 65535){
784 errorstream<<getDesc()<<" ran out of peer ids"<<std::endl;
789 dout_con<<"Receive(): Got a packet with peer_id=PEER_ID_INEXISTENT,"
790 " giving peer_id="<<peer_id_new<<std::endl;
793 Peer *peer = new Peer(peer_id_new, sender);
794 m_peers.insert(peer->id, peer);
796 // Create peer addition event
798 e.peerAdded(peer_id_new, sender);
801 // Create CONTROL packet to tell the peer id to the new peer.
802 SharedBuffer<u8> reply(4);
803 writeU8(&reply[0], TYPE_CONTROL);
804 writeU8(&reply[1], CONTROLTYPE_SET_PEER_ID);
805 writeU16(&reply[2], peer_id_new);
806 sendAsPacket(peer_id_new, 0, reply, true);
808 // We're now talking to a valid peer_id
809 peer_id = peer_id_new;
811 // Go on and process whatever it sent
814 core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
819 // This means that the peer id of the sender is not PEER_ID_INEXISTENT
820 // and it is invalid.
822 derr_con<<"Receive(): Peer not found"<<std::endl;
823 throw InvalidIncomingDataException("Peer not found (possible timeout)");
826 Peer *peer = node->getValue();
828 // Validate peer address
829 if(peer->address != sender)
832 derr_con<<"Peer "<<peer_id<<" sending from different address."
833 " Ignoring."<<std::endl;
837 peer->timeout_counter = 0.0;
839 Channel *channel = &(peer->channels[channelnum]);
841 // Throw the received packet to channel->processPacket()
843 // Make a new SharedBuffer from the data without the base headers
844 SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE);
845 memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE],
846 strippeddata.getSize());
849 // Process it (the result is some data with no headers made by us)
850 SharedBuffer<u8> resultdata = processPacket
851 (channel, strippeddata, peer_id, channelnum, false);
854 dout_con<<"ProcessPacket returned data of size "
855 <<resultdata.getSize()<<std::endl;
857 if(datasize < resultdata.getSize())
858 throw InvalidIncomingDataException
859 ("Buffer too small for received data");
862 e.dataReceived(peer_id, resultdata);
865 }catch(ProcessedSilentlyException &e){
867 }catch(InvalidIncomingDataException &e){
869 catch(ProcessedSilentlyException &e){
874 void Connection::runTimeouts(float dtime)
876 core::list<u16> timeouted_peers;
877 core::map<u16, Peer*>::Iterator j;
878 j = m_peers.getIterator();
879 for(; j.atEnd() == false; j++)
881 Peer *peer = j.getNode()->getValue();
886 peer->timeout_counter += dtime;
887 if(peer->timeout_counter > m_timeout)
890 derr_con<<"RunTimeouts(): Peer "<<peer->id
892 <<" (source=peer->timeout_counter)"
894 // Add peer to the list
895 timeouted_peers.push_back(peer->id);
896 // Don't bother going through the buffers of this one
900 float resend_timeout = peer->resend_timeout;
901 for(u16 i=0; i<CHANNEL_COUNT; i++)
903 core::list<BufferedPacket> timed_outs;
904 core::list<BufferedPacket>::Iterator j;
906 Channel *channel = &peer->channels[i];
908 // Remove timed out incomplete unreliable split packets
909 channel->incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout);
911 // Increment reliable packet times
912 channel->outgoing_reliables.incrementTimeouts(dtime);
914 // Check reliable packet total times, remove peer if
916 if(channel->outgoing_reliables.anyTotaltimeReached(m_timeout))
919 derr_con<<"RunTimeouts(): Peer "<<peer->id
921 <<" (source=reliable packet totaltime)"
923 // Add peer to the to-be-removed list
924 timeouted_peers.push_back(peer->id);
928 // Re-send timed out outgoing reliables
930 timed_outs = channel->
931 outgoing_reliables.getTimedOuts(resend_timeout);
933 channel->outgoing_reliables.resetTimedOuts(resend_timeout);
935 j = timed_outs.begin();
936 for(; j != timed_outs.end(); j++)
938 u16 peer_id = readPeerId(*(j->data));
939 u8 channel = readChannel(*(j->data));
940 u16 seqnum = readU16(&(j->data[BASE_HEADER_SIZE+1]));
943 derr_con<<"RE-SENDING timed-out RELIABLE to ";
944 j->address.print(&derr_con);
945 derr_con<<"(t/o="<<resend_timeout<<"): "
946 <<"from_peer_id="<<peer_id
947 <<", channel="<<((int)channel&0xff)
948 <<", seqnum="<<seqnum
953 // Enlarge avg_rtt and resend_timeout:
954 // The rtt will be at least the timeout.
955 // NOTE: This won't affect the timeout of the next
956 // checked channel because it was cached.
957 peer->reportRTT(resend_timeout);
964 peer->ping_timer += dtime;
965 if(peer->ping_timer >= 5.0)
967 // Create and send PING packet
968 SharedBuffer<u8> data(2);
969 writeU8(&data[0], TYPE_CONTROL);
970 writeU8(&data[1], CONTROLTYPE_PING);
971 rawSendAsPacket(peer->id, 0, data, true);
973 peer->ping_timer = 0.0;
980 // Remove timed out peers
981 core::list<u16>::Iterator i = timeouted_peers.begin();
982 for(; i != timeouted_peers.end(); i++)
985 derr_con<<"RunTimeouts(): Removing peer "<<(*i)<<std::endl;
986 deletePeer(*i, true);
990 void Connection::serve(u16 port)
992 dout_con<<getDesc()<<" serving at port "<<port<<std::endl;
994 m_peer_id = PEER_ID_SERVER;
997 void Connection::connect(Address address)
999 dout_con<<getDesc()<<" connecting to "<<address.serializeString()
1000 <<":"<<address.getPort()<<std::endl;
1002 core::map<u16, Peer*>::Node *node = m_peers.find(PEER_ID_SERVER);
1004 throw ConnectionException("Already connected to a server");
1007 Peer *peer = new Peer(PEER_ID_SERVER, address);
1008 m_peers.insert(peer->id, peer);
1012 e.peerAdded(peer->id, peer->address);
1017 // Send a dummy packet to server with peer_id = PEER_ID_INEXISTENT
1018 m_peer_id = PEER_ID_INEXISTENT;
1019 SharedBuffer<u8> data(0);
1020 Send(PEER_ID_SERVER, 0, data, true);
1023 void Connection::disconnect()
1025 dout_con<<getDesc()<<" disconnecting"<<std::endl;
1027 // Create and send DISCO packet
1028 SharedBuffer<u8> data(2);
1029 writeU8(&data[0], TYPE_CONTROL);
1030 writeU8(&data[1], CONTROLTYPE_DISCO);
1033 core::map<u16, Peer*>::Iterator j;
1034 j = m_peers.getIterator();
1035 for(; j.atEnd() == false; j++)
1037 Peer *peer = j.getNode()->getValue();
1038 rawSendAsPacket(peer->id, 0, data, false);
1042 void Connection::sendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable)
1044 core::map<u16, Peer*>::Iterator j;
1045 j = m_peers.getIterator();
1046 for(; j.atEnd() == false; j++)
1048 Peer *peer = j.getNode()->getValue();
1049 send(peer->id, channelnum, data, reliable);
1053 void Connection::send(u16 peer_id, u8 channelnum,
1054 SharedBuffer<u8> data, bool reliable)
1056 dout_con<<getDesc()<<" sending to peer_id="<<peer_id<<std::endl;
1058 assert(channelnum < CHANNEL_COUNT);
1060 Peer *peer = getPeerNoEx(peer_id);
1063 Channel *channel = &(peer->channels[channelnum]);
1065 u32 chunksize_max = m_max_packet_size - BASE_HEADER_SIZE;
1067 chunksize_max -= RELIABLE_HEADER_SIZE;
1069 core::list<SharedBuffer<u8> > originals;
1070 originals = makeAutoSplitPacket(data, chunksize_max,
1071 channel->next_outgoing_split_seqnum);
1073 core::list<SharedBuffer<u8> >::Iterator i;
1074 i = originals.begin();
1075 for(; i != originals.end(); i++)
1077 SharedBuffer<u8> original = *i;
1079 sendAsPacket(peer_id, channelnum, original, reliable);
1083 void Connection::sendAsPacket(u16 peer_id, u8 channelnum,
1084 SharedBuffer<u8> data, bool reliable)
1086 OutgoingPacket packet(peer_id, channelnum, data, reliable);
1087 m_outgoing_queue.push_back(packet);
1090 void Connection::rawSendAsPacket(u16 peer_id, u8 channelnum,
1091 SharedBuffer<u8> data, bool reliable)
1093 Peer *peer = getPeerNoEx(peer_id);
1096 Channel *channel = &(peer->channels[channelnum]);
1100 u16 seqnum = channel->next_outgoing_seqnum;
1101 channel->next_outgoing_seqnum++;
1103 SharedBuffer<u8> reliable = makeReliablePacket(data, seqnum);
1105 // Add base headers and make a packet
1106 BufferedPacket p = makePacket(peer->address, reliable,
1107 m_protocol_id, m_peer_id, channelnum);
1110 // Buffer the packet
1111 channel->outgoing_reliables.insert(p);
1113 catch(AlreadyExistsException &e)
1115 PrintInfo(derr_con);
1116 derr_con<<"WARNING: Going to send a reliable packet "
1117 "seqnum="<<seqnum<<" that is already "
1118 "in outgoing buffer"<<std::endl;
1127 // Add base headers and make a packet
1128 BufferedPacket p = makePacket(peer->address, data,
1129 m_protocol_id, m_peer_id, channelnum);
1136 void Connection::rawSend(const BufferedPacket &packet)
1139 m_socket.Send(packet.address, *packet.data, packet.data.getSize());
1140 } catch(SendFailedException &e){
1141 derr_con<<"Connection::rawSend(): SendFailedException: "
1142 <<packet.address.serializeString()<<std::endl;
1146 Peer* Connection::getPeer(u16 peer_id)
1148 core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
1151 throw PeerNotFoundException("GetPeer: Peer not found (possible timeout)");
1155 assert(node->getValue()->id == peer_id);
1157 return node->getValue();
1160 Peer* Connection::getPeerNoEx(u16 peer_id)
1162 core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
1169 assert(node->getValue()->id == peer_id);
1171 return node->getValue();
1174 core::list<Peer*> Connection::getPeers()
1176 core::list<Peer*> list;
1177 core::map<u16, Peer*>::Iterator j;
1178 j = m_peers.getIterator();
1179 for(; j.atEnd() == false; j++)
1181 Peer *peer = j.getNode()->getValue();
1182 list.push_back(peer);
1187 bool Connection::getFromBuffers(u16 &peer_id, SharedBuffer<u8> &dst)
1189 core::map<u16, Peer*>::Iterator j;
1190 j = m_peers.getIterator();
1191 for(; j.atEnd() == false; j++)
1193 Peer *peer = j.getNode()->getValue();
1194 for(u16 i=0; i<CHANNEL_COUNT; i++)
1196 Channel *channel = &peer->channels[i];
1197 SharedBuffer<u8> resultdata;
1198 bool got = checkIncomingBuffers(channel, peer_id, resultdata);
1208 bool Connection::checkIncomingBuffers(Channel *channel, u16 &peer_id,
1209 SharedBuffer<u8> &dst)
1211 u16 firstseqnum = 0;
1212 // Clear old packets from start of buffer
1215 firstseqnum = channel->incoming_reliables.getFirstSeqnum();
1216 if(seqnum_higher(channel->next_incoming_seqnum, firstseqnum))
1217 channel->incoming_reliables.popFirst();
1221 // This happens if all packets are old
1222 }catch(con::NotFoundException)
1225 if(channel->incoming_reliables.empty() == false)
1227 if(firstseqnum == channel->next_incoming_seqnum)
1229 BufferedPacket p = channel->incoming_reliables.popFirst();
1231 peer_id = readPeerId(*p.data);
1232 u8 channelnum = readChannel(*p.data);
1233 u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
1236 dout_con<<"UNBUFFERING TYPE_RELIABLE"
1237 <<" seqnum="<<seqnum
1238 <<" peer_id="<<peer_id
1239 <<" channel="<<((int)channelnum&0xff)
1242 channel->next_incoming_seqnum++;
1244 u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
1245 // Get out the inside packet and re-process it
1246 SharedBuffer<u8> payload(p.data.getSize() - headers_size);
1247 memcpy(*payload, &p.data[headers_size], payload.getSize());
1249 dst = processPacket(channel, payload, peer_id, channelnum, true);
1256 SharedBuffer<u8> Connection::processPacket(Channel *channel,
1257 SharedBuffer<u8> packetdata, u16 peer_id,
1258 u8 channelnum, bool reliable)
1260 IndentationRaiser iraiser(&(m_indentation));
1262 if(packetdata.getSize() < 1)
1263 throw InvalidIncomingDataException("packetdata.getSize() < 1");
1265 u8 type = readU8(&packetdata[0]);
1267 if(type == TYPE_CONTROL)
1269 if(packetdata.getSize() < 2)
1270 throw InvalidIncomingDataException("packetdata.getSize() < 2");
1272 u8 controltype = readU8(&packetdata[1]);
1274 if(controltype == CONTROLTYPE_ACK)
1276 if(packetdata.getSize() < 4)
1277 throw InvalidIncomingDataException
1278 ("packetdata.getSize() < 4 (ACK header size)");
1280 u16 seqnum = readU16(&packetdata[2]);
1282 dout_con<<"Got CONTROLTYPE_ACK: channelnum="
1283 <<((int)channelnum&0xff)<<", peer_id="<<peer_id
1284 <<", seqnum="<<seqnum<<std::endl;
1287 BufferedPacket p = channel->outgoing_reliables.popSeqnum(seqnum);
1288 // Get round trip time
1289 float rtt = p.totaltime;
1291 // Let peer calculate stuff according to it
1292 // (avg_rtt and resend_timeout)
1293 Peer *peer = getPeer(peer_id);
1294 peer->reportRTT(rtt);
1296 //PrintInfo(dout_con);
1297 //dout_con<<"RTT = "<<rtt<<std::endl;
1299 /*dout_con<<"OUTGOING: ";
1301 channel->outgoing_reliables.print();
1302 dout_con<<std::endl;*/
1304 catch(NotFoundException &e){
1305 PrintInfo(derr_con);
1306 derr_con<<"WARNING: ACKed packet not "
1311 throw ProcessedSilentlyException("Got an ACK");
1313 else if(controltype == CONTROLTYPE_SET_PEER_ID)
1315 if(packetdata.getSize() < 4)
1316 throw InvalidIncomingDataException
1317 ("packetdata.getSize() < 4 (SET_PEER_ID header size)");
1318 u16 peer_id_new = readU16(&packetdata[2]);
1320 dout_con<<"Got new peer id: "<<peer_id_new<<"... "<<std::endl;
1322 if(GetPeerID() != PEER_ID_INEXISTENT)
1324 PrintInfo(derr_con);
1325 derr_con<<"WARNING: Not changing"
1326 " existing peer id."<<std::endl;
1330 dout_con<<"changing."<<std::endl;
1331 SetPeerID(peer_id_new);
1333 throw ProcessedSilentlyException("Got a SET_PEER_ID");
1335 else if(controltype == CONTROLTYPE_PING)
1337 // Just ignore it, the incoming data already reset
1338 // the timeout counter
1340 dout_con<<"PING"<<std::endl;
1341 throw ProcessedSilentlyException("Got a PING");
1343 else if(controltype == CONTROLTYPE_DISCO)
1345 // Just ignore it, the incoming data already reset
1346 // the timeout counter
1348 dout_con<<"DISCO: Removing peer "<<(peer_id)<<std::endl;
1350 if(deletePeer(peer_id, false) == false)
1352 PrintInfo(derr_con);
1353 derr_con<<"DISCO: Peer not found"<<std::endl;
1356 throw ProcessedSilentlyException("Got a DISCO");
1359 PrintInfo(derr_con);
1360 derr_con<<"INVALID TYPE_CONTROL: invalid controltype="
1361 <<((int)controltype&0xff)<<std::endl;
1362 throw InvalidIncomingDataException("Invalid control type");
1365 else if(type == TYPE_ORIGINAL)
1367 if(packetdata.getSize() < ORIGINAL_HEADER_SIZE)
1368 throw InvalidIncomingDataException
1369 ("packetdata.getSize() < ORIGINAL_HEADER_SIZE");
1371 dout_con<<"RETURNING TYPE_ORIGINAL to user"
1373 // Get the inside packet out and return it
1374 SharedBuffer<u8> payload(packetdata.getSize() - ORIGINAL_HEADER_SIZE);
1375 memcpy(*payload, &packetdata[ORIGINAL_HEADER_SIZE], payload.getSize());
1378 else if(type == TYPE_SPLIT)
1380 // We have to create a packet again for buffering
1381 // This isn't actually too bad an idea.
1382 BufferedPacket packet = makePacket(
1383 getPeer(peer_id)->address,
1388 // Buffer the packet
1389 SharedBuffer<u8> data = channel->incoming_splits.insert(packet, reliable);
1390 if(data.getSize() != 0)
1393 dout_con<<"RETURNING TYPE_SPLIT: Constructed full data, "
1394 <<"size="<<data.getSize()<<std::endl;
1398 dout_con<<"BUFFERED TYPE_SPLIT"<<std::endl;
1399 throw ProcessedSilentlyException("Buffered a split packet chunk");
1401 else if(type == TYPE_RELIABLE)
1403 // Recursive reliable packets not allowed
1404 assert(reliable == false);
1406 if(packetdata.getSize() < RELIABLE_HEADER_SIZE)
1407 throw InvalidIncomingDataException
1408 ("packetdata.getSize() < RELIABLE_HEADER_SIZE");
1410 u16 seqnum = readU16(&packetdata[1]);
1412 bool is_future_packet = seqnum_higher(seqnum, channel->next_incoming_seqnum);
1413 bool is_old_packet = seqnum_higher(channel->next_incoming_seqnum, seqnum);
1416 if(is_future_packet)
1417 dout_con<<"BUFFERING";
1418 else if(is_old_packet)
1422 dout_con<<" TYPE_RELIABLE seqnum="<<seqnum
1423 <<" next="<<channel->next_incoming_seqnum;
1424 dout_con<<" [sending CONTROLTYPE_ACK"
1425 " to peer_id="<<peer_id<<"]";
1426 dout_con<<std::endl;
1429 //assert(channel->incoming_reliables.size() < 100);
1431 // Send a CONTROLTYPE_ACK
1432 SharedBuffer<u8> reply(4);
1433 writeU8(&reply[0], TYPE_CONTROL);
1434 writeU8(&reply[1], CONTROLTYPE_ACK);
1435 writeU16(&reply[2], seqnum);
1436 rawSendAsPacket(peer_id, channelnum, reply, false);
1438 //if(seqnum_higher(seqnum, channel->next_incoming_seqnum))
1439 if(is_future_packet)
1442 dout_con<<"Buffering reliable packet (seqnum="
1443 <<seqnum<<")"<<std::endl;*/
1445 // This one comes later, buffer it.
1446 // Actually we have to make a packet to buffer one.
1447 // Well, we have all the ingredients, so just do it.
1448 BufferedPacket packet = makePacket(
1449 getPeer(peer_id)->address,
1455 channel->incoming_reliables.insert(packet);
1458 dout_con<<"INCOMING: ";
1459 channel->incoming_reliables.print();
1460 dout_con<<std::endl;*/
1462 catch(AlreadyExistsException &e)
1466 throw ProcessedSilentlyException("Buffered future reliable packet");
1468 //else if(seqnum_higher(channel->next_incoming_seqnum, seqnum))
1469 else if(is_old_packet)
1471 // An old packet, dump it
1472 throw InvalidIncomingDataException("Got an old reliable packet");
1475 channel->next_incoming_seqnum++;
1477 // Get out the inside packet and re-process it
1478 SharedBuffer<u8> payload(packetdata.getSize() - RELIABLE_HEADER_SIZE);
1479 memcpy(*payload, &packetdata[RELIABLE_HEADER_SIZE], payload.getSize());
1481 return processPacket(channel, payload, peer_id, channelnum, true);
1485 PrintInfo(derr_con);
1486 derr_con<<"Got invalid type="<<((int)type&0xff)<<std::endl;
1487 throw InvalidIncomingDataException("Invalid packet type");
1490 // We should never get here.
1491 // If you get here, add an exception or a return to some of the
1492 // above conditionals.
1494 throw BaseException("Error in Channel::ProcessPacket()");
1497 bool Connection::deletePeer(u16 peer_id, bool timeout)
1499 if(m_peers.find(peer_id) == NULL)
1502 Peer *peer = m_peers[peer_id];
1506 e.peerRemoved(peer_id, timeout, peer->address);
1509 delete m_peers[peer_id];
1510 m_peers.remove(peer_id);
1516 ConnectionEvent Connection::getEvent()
1518 if(m_event_queue.size() == 0){
1520 e.type = CONNEVENT_NONE;
1523 return m_event_queue.pop_front();
1526 ConnectionEvent Connection::waitEvent(u32 timeout_ms)
1529 return m_event_queue.pop_front(timeout_ms);
1530 } catch(ItemNotFoundException &ex){
1532 e.type = CONNEVENT_NONE;
1537 void Connection::putCommand(ConnectionCommand &c)
1539 m_command_queue.push_back(c);
1542 void Connection::Serve(unsigned short port)
1544 ConnectionCommand c;
1549 void Connection::Connect(Address address)
1551 ConnectionCommand c;
1556 bool Connection::Connected()
1558 JMutexAutoLock peerlock(m_peers_mutex);
1560 if(m_peers.size() != 1)
1563 core::map<u16, Peer*>::Node *node = m_peers.find(PEER_ID_SERVER);
1567 if(m_peer_id == PEER_ID_INEXISTENT)
1573 void Connection::Disconnect()
1575 ConnectionCommand c;
1580 u32 Connection::Receive(u16 &peer_id, u8 *data, u32 datasize)
1583 ConnectionEvent e = waitEvent(m_bc_receive_timeout);
1584 if(e.type != CONNEVENT_NONE)
1585 dout_con<<getDesc()<<": Receive: got event: "
1586 <<e.describe()<<std::endl;
1588 case CONNEVENT_NONE:
1589 throw NoIncomingDataException("No incoming data");
1590 case CONNEVENT_DATA_RECEIVED:
1591 peer_id = e.peer_id;
1592 memcpy(data, *e.data, e.data.getSize());
1593 return e.data.getSize();
1594 case CONNEVENT_PEER_ADDED: {
1595 Peer tmp(e.peer_id, e.address);
1596 if(m_bc_peerhandler)
1597 m_bc_peerhandler->peerAdded(&tmp);
1599 case CONNEVENT_PEER_REMOVED: {
1600 Peer tmp(e.peer_id, e.address);
1601 if(m_bc_peerhandler)
1602 m_bc_peerhandler->deletingPeer(&tmp, e.timeout);
1606 throw NoIncomingDataException("No incoming data");
1609 void Connection::SendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable)
1611 assert(channelnum < CHANNEL_COUNT);
1613 ConnectionCommand c;
1614 c.sendToAll(channelnum, data, reliable);
1618 void Connection::Send(u16 peer_id, u8 channelnum,
1619 SharedBuffer<u8> data, bool reliable)
1621 assert(channelnum < CHANNEL_COUNT);
1623 ConnectionCommand c;
1624 c.send(peer_id, channelnum, data, reliable);
1628 void Connection::RunTimeouts(float dtime)
1633 Address Connection::GetPeerAddress(u16 peer_id)
1635 JMutexAutoLock peerlock(m_peers_mutex);
1636 return getPeer(peer_id)->address;
1639 float Connection::GetPeerAvgRTT(u16 peer_id)
1641 JMutexAutoLock peerlock(m_peers_mutex);
1642 return getPeer(peer_id)->avg_rtt;
1645 void Connection::DeletePeer(u16 peer_id)
1647 ConnectionCommand c;
1648 c.deletePeer(peer_id);
1652 void Connection::PrintInfo(std::ostream &out)
1654 out<<getDesc()<<": ";
1657 void Connection::PrintInfo()
1659 PrintInfo(dout_con);
1662 std::string Connection::getDesc()
1664 return std::string("con(")+itos(m_socket.GetHandle())+"/"+itos(m_peer_id)+")";