3 Copyright (C) 2010 celeron55, Perttu Ahola <celeron55@gmail.com>
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License along
16 with this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 #include "connection.h"
22 #include "serialization.h"
29 BufferedPacket makePacket(Address &address, u8 *data, u32 datasize,
30 u32 protocol_id, u16 sender_peer_id, u8 channel)
32 u32 packet_size = datasize + BASE_HEADER_SIZE;
33 BufferedPacket p(packet_size);
36 writeU32(&p.data[0], protocol_id);
37 writeU16(&p.data[4], sender_peer_id);
38 writeU8(&p.data[6], channel);
40 memcpy(&p.data[BASE_HEADER_SIZE], data, datasize);
45 BufferedPacket makePacket(Address &address, SharedBuffer<u8> &data,
46 u32 protocol_id, u16 sender_peer_id, u8 channel)
48 return makePacket(address, *data, data.getSize(),
49 protocol_id, sender_peer_id, channel);
52 SharedBuffer<u8> makeOriginalPacket(
53 SharedBuffer<u8> data)
56 u32 packet_size = data.getSize() + header_size;
57 SharedBuffer<u8> b(packet_size);
59 writeU8(&b[0], TYPE_ORIGINAL);
61 memcpy(&b[header_size], *data, data.getSize());
66 core::list<SharedBuffer<u8> > makeSplitPacket(
67 SharedBuffer<u8> data,
71 // Chunk packets, containing the TYPE_SPLIT header
72 core::list<SharedBuffer<u8> > chunks;
74 u32 chunk_header_size = 7;
75 u32 maximum_data_size = chunksize_max - chunk_header_size;
80 end = start + maximum_data_size - 1;
81 if(end > data.getSize() - 1)
82 end = data.getSize() - 1;
84 u32 payload_size = end - start + 1;
85 u32 packet_size = chunk_header_size + payload_size;
87 SharedBuffer<u8> chunk(packet_size);
89 writeU8(&chunk[0], TYPE_SPLIT);
90 writeU16(&chunk[1], seqnum);
91 // [3] u16 chunk_count is written at next stage
92 writeU16(&chunk[5], chunk_num);
93 memcpy(&chunk[chunk_header_size], &data[start], payload_size);
95 chunks.push_back(chunk);
100 while(end != data.getSize() - 1);
102 u16 chunk_count = chunks.getSize();
104 core::list<SharedBuffer<u8> >::Iterator i = chunks.begin();
105 for(; i != chunks.end(); i++)
108 writeU16(&((*i)[3]), chunk_count);
114 core::list<SharedBuffer<u8> > makeAutoSplitPacket(
115 SharedBuffer<u8> data,
119 u32 original_header_size = 1;
120 core::list<SharedBuffer<u8> > list;
121 if(data.getSize() + original_header_size > chunksize_max)
123 list = makeSplitPacket(data, chunksize_max, split_seqnum);
129 list.push_back(makeOriginalPacket(data));
134 SharedBuffer<u8> makeReliablePacket(
135 SharedBuffer<u8> data,
138 /*dstream<<"BEGIN SharedBuffer<u8> makeReliablePacket()"<<std::endl;
139 dstream<<"data.getSize()="<<data.getSize()<<", data[0]="
140 <<((unsigned int)data[0]&0xff)<<std::endl;*/
142 u32 packet_size = data.getSize() + header_size;
143 SharedBuffer<u8> b(packet_size);
145 writeU8(&b[0], TYPE_RELIABLE);
146 writeU16(&b[1], seqnum);
148 memcpy(&b[header_size], *data, data.getSize());
150 /*dstream<<"data.getSize()="<<data.getSize()<<", data[0]="
151 <<((unsigned int)data[0]&0xff)<<std::endl;*/
152 //dstream<<"END SharedBuffer<u8> makeReliablePacket()"<<std::endl;
160 void ReliablePacketBuffer::print()
162 core::list<BufferedPacket>::Iterator i;
164 for(; i != m_list.end(); i++)
166 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
170 bool ReliablePacketBuffer::empty()
172 return m_list.empty();
174 u32 ReliablePacketBuffer::size()
176 return m_list.getSize();
178 RPBSearchResult ReliablePacketBuffer::findPacket(u16 seqnum)
180 core::list<BufferedPacket>::Iterator i;
182 for(; i != m_list.end(); i++)
184 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
185 /*dout_con<<"findPacket(): finding seqnum="<<seqnum
186 <<", comparing to s="<<s<<std::endl;*/
192 RPBSearchResult ReliablePacketBuffer::notFound()
196 u16 ReliablePacketBuffer::getFirstSeqnum()
199 throw NotFoundException("Buffer is empty");
200 BufferedPacket p = *m_list.begin();
201 return readU16(&p.data[BASE_HEADER_SIZE+1]);
203 BufferedPacket ReliablePacketBuffer::popFirst()
206 throw NotFoundException("Buffer is empty");
207 BufferedPacket p = *m_list.begin();
208 core::list<BufferedPacket>::Iterator i = m_list.begin();
212 BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum)
214 RPBSearchResult r = findPacket(seqnum);
216 dout_con<<"Not found"<<std::endl;
217 throw NotFoundException("seqnum not found in buffer");
219 BufferedPacket p = *r;
223 void ReliablePacketBuffer::insert(BufferedPacket &p)
225 assert(p.data.getSize() >= BASE_HEADER_SIZE+3);
226 u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
227 assert(type == TYPE_RELIABLE);
228 u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
230 // Find the right place for the packet and insert it there
232 // If list is empty, just add it
239 // Otherwise find the right place
240 core::list<BufferedPacket>::Iterator i;
242 // Find the first packet in the list which has a higher seqnum
243 for(; i != m_list.end(); i++){
244 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
246 throw AlreadyExistsException("Same seqnum in list");
248 if(seqnum_higher(s, seqnum)){
252 // If we're at the end of the list, add the packet to the
254 if(i == m_list.end())
261 m_list.insert_before(i, p);
264 void ReliablePacketBuffer::incrementTimeouts(float dtime)
266 core::list<BufferedPacket>::Iterator i;
268 for(; i != m_list.end(); i++){
270 i->totaltime += dtime;
274 void ReliablePacketBuffer::resetTimedOuts(float timeout)
276 core::list<BufferedPacket>::Iterator i;
278 for(; i != m_list.end(); i++){
279 if(i->time >= timeout)
284 bool ReliablePacketBuffer::anyTotaltimeReached(float timeout)
286 core::list<BufferedPacket>::Iterator i;
288 for(; i != m_list.end(); i++){
289 if(i->totaltime >= timeout)
295 core::list<BufferedPacket> ReliablePacketBuffer::getTimedOuts(float timeout)
297 core::list<BufferedPacket> timed_outs;
298 core::list<BufferedPacket>::Iterator i;
300 for(; i != m_list.end(); i++)
302 if(i->time >= timeout)
303 timed_outs.push_back(*i);
312 IncomingSplitBuffer::~IncomingSplitBuffer()
314 core::map<u16, IncomingSplitPacket*>::Iterator i;
315 i = m_buf.getIterator();
316 for(; i.atEnd() == false; i++)
318 delete i.getNode()->getValue();
322 This will throw a GotSplitPacketException when a full
323 split packet is constructed.
325 SharedBuffer<u8> IncomingSplitBuffer::insert(BufferedPacket &p, bool reliable)
327 u32 headersize = BASE_HEADER_SIZE + 7;
328 assert(p.data.getSize() >= headersize);
329 u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
330 assert(type == TYPE_SPLIT);
331 u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
332 u16 chunk_count = readU16(&p.data[BASE_HEADER_SIZE+3]);
333 u16 chunk_num = readU16(&p.data[BASE_HEADER_SIZE+5]);
335 // Add if doesn't exist
336 if(m_buf.find(seqnum) == NULL)
338 IncomingSplitPacket *sp = new IncomingSplitPacket();
339 sp->chunk_count = chunk_count;
340 sp->reliable = reliable;
344 IncomingSplitPacket *sp = m_buf[seqnum];
346 // TODO: These errors should be thrown or something? Dunno.
347 if(chunk_count != sp->chunk_count)
348 derr_con<<"Connection: WARNING: chunk_count="<<chunk_count
349 <<" != sp->chunk_count="<<sp->chunk_count
351 if(reliable != sp->reliable)
352 derr_con<<"Connection: WARNING: reliable="<<reliable
353 <<" != sp->reliable="<<sp->reliable
356 // If chunk already exists, cancel
357 if(sp->chunks.find(chunk_num) != NULL)
358 throw AlreadyExistsException("Chunk already in buffer");
360 // Cut chunk data out of packet
361 u32 chunkdatasize = p.data.getSize() - headersize;
362 SharedBuffer<u8> chunkdata(chunkdatasize);
363 memcpy(*chunkdata, &(p.data[headersize]), chunkdatasize);
365 // Set chunk data in buffer
366 sp->chunks[chunk_num] = chunkdata;
368 // If not all chunks are received, return empty buffer
369 if(sp->allReceived() == false)
370 return SharedBuffer<u8>();
372 // Calculate total size
374 core::map<u16, SharedBuffer<u8> >::Iterator i;
375 i = sp->chunks.getIterator();
376 for(; i.atEnd() == false; i++)
378 totalsize += i.getNode()->getValue().getSize();
381 SharedBuffer<u8> fulldata(totalsize);
383 // Copy chunks to data buffer
385 for(u32 chunk_i=0; chunk_i<sp->chunk_count;
388 SharedBuffer<u8> buf = sp->chunks[chunk_i];
389 u16 chunkdatasize = buf.getSize();
390 memcpy(&fulldata[start], *buf, chunkdatasize);
391 start += chunkdatasize;;
394 // Remove sp from buffer
395 m_buf.remove(seqnum);
400 void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout)
402 core::list<u16> remove_queue;
403 core::map<u16, IncomingSplitPacket*>::Iterator i;
404 i = m_buf.getIterator();
405 for(; i.atEnd() == false; i++)
407 IncomingSplitPacket *p = i.getNode()->getValue();
408 // Reliable ones are not removed by timeout
409 if(p->reliable == true)
412 if(p->time >= timeout)
413 remove_queue.push_back(i.getNode()->getKey());
415 core::list<u16>::Iterator j;
416 j = remove_queue.begin();
417 for(; j != remove_queue.end(); j++)
419 dout_con<<"NOTE: Removing timed out unreliable split packet"
432 next_outgoing_seqnum = SEQNUM_INITIAL;
433 next_incoming_seqnum = SEQNUM_INITIAL;
434 next_outgoing_split_seqnum = SEQNUM_INITIAL;
444 Peer::Peer(u16 a_id, Address a_address):
447 timeout_counter(0.0),
451 has_sent_with_id(false),
453 m_max_packets_per_second(10),
462 void Peer::reportRTT(float rtt)
466 if(m_max_packets_per_second < 400)
467 m_max_packets_per_second += 10;
468 } else if(rtt < 0.2){
469 if(m_max_packets_per_second < 100)
470 m_max_packets_per_second += 2;
472 m_max_packets_per_second *= 0.8;
473 if(m_max_packets_per_second < 10)
474 m_max_packets_per_second = 10;
480 else if(avg_rtt < 0.0)
483 avg_rtt = rtt * 0.1 + avg_rtt * 0.9;
485 // Calculate resend_timeout
487 /*int reliable_count = 0;
488 for(int i=0; i<CHANNEL_COUNT; i++)
490 reliable_count += channels[i].outgoing_reliables.size();
492 float timeout = avg_rtt * RESEND_TIMEOUT_FACTOR
493 * ((float)reliable_count * 1);*/
495 float timeout = avg_rtt * RESEND_TIMEOUT_FACTOR;
496 if(timeout < RESEND_TIMEOUT_MIN)
497 timeout = RESEND_TIMEOUT_MIN;
498 if(timeout > RESEND_TIMEOUT_MAX)
499 timeout = RESEND_TIMEOUT_MAX;
500 resend_timeout = timeout;
507 Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout):
508 m_protocol_id(protocol_id),
509 m_max_packet_size(max_packet_size),
512 m_bc_peerhandler(NULL),
513 m_bc_receive_timeout(0),
516 m_socket.setTimeoutMs(5);
521 Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout,
522 PeerHandler *peerhandler):
523 m_protocol_id(protocol_id),
524 m_max_packet_size(max_packet_size),
527 m_bc_peerhandler(peerhandler),
528 m_bc_receive_timeout(0),
531 m_socket.setTimeoutMs(5);
537 Connection::~Connection()
541 for(core::map<u16, Peer*>::Iterator
542 j = m_peers.getIterator();
543 j.atEnd() == false; j++)
545 Peer *peer = j.getNode()->getValue();
552 void * Connection::Thread()
555 log_register_thread("Connection");
557 dout_con<<"Connection thread started"<<std::endl;
559 u32 curtime = porting::getTimeMs();
560 u32 lasttime = curtime;
564 BEGIN_DEBUG_EXCEPTION_HANDLER
567 curtime = porting::getTimeMs();
568 float dtime = (float)(curtime - lasttime) / 1000.;
576 while(m_command_queue.size() != 0){
577 ConnectionCommand c = m_command_queue.pop_front();
585 END_DEBUG_EXCEPTION_HANDLER(derr_con);
591 void Connection::putEvent(ConnectionEvent &e)
593 assert(e.type != CONNEVENT_NONE);
594 m_event_queue.push_back(e);
597 void Connection::processCommand(ConnectionCommand &c)
601 dout_con<<getDesc()<<" processing CONNCMD_NONE"<<std::endl;
604 dout_con<<getDesc()<<" processing CONNCMD_SERVE port="
608 case CONNCMD_CONNECT:
609 dout_con<<getDesc()<<" processing CONNCMD_CONNECT"<<std::endl;
612 case CONNCMD_DISCONNECT:
613 dout_con<<getDesc()<<" processing CONNCMD_DISCONNECT"<<std::endl;
617 dout_con<<getDesc()<<" processing CONNCMD_SEND"<<std::endl;
618 send(c.peer_id, c.channelnum, c.data, c.reliable);
620 case CONNCMD_SEND_TO_ALL:
621 dout_con<<getDesc()<<" processing CONNCMD_SEND_TO_ALL"<<std::endl;
622 sendToAll(c.channelnum, c.data, c.reliable);
624 case CONNCMD_DELETE_PEER:
625 dout_con<<getDesc()<<" processing CONNCMD_DELETE_PEER"<<std::endl;
626 deletePeer(c.peer_id, false);
631 void Connection::send(float dtime)
633 for(core::map<u16, Peer*>::Iterator
634 j = m_peers.getIterator();
635 j.atEnd() == false; j++)
637 Peer *peer = j.getNode()->getValue();
638 peer->m_sendtime_accu += dtime;
639 peer->m_num_sent = 0;
640 peer->m_max_num_sent = peer->m_sendtime_accu *
641 peer->m_max_packets_per_second;
643 Queue<OutgoingPacket> postponed_packets;
644 while(m_outgoing_queue.size() != 0){
645 OutgoingPacket packet = m_outgoing_queue.pop_front();
646 Peer *peer = getPeerNoEx(packet.peer_id);
649 if(peer->channels[packet.channelnum].outgoing_reliables.size() >= 5){
650 postponed_packets.push_back(packet);
651 } else if(peer->m_num_sent < peer->m_max_num_sent){
652 rawSendAsPacket(packet.peer_id, packet.channelnum,
653 packet.data, packet.reliable);
656 postponed_packets.push_back(packet);
659 while(postponed_packets.size() != 0){
660 m_outgoing_queue.push_back(postponed_packets.pop_front());
662 for(core::map<u16, Peer*>::Iterator
663 j = m_peers.getIterator();
664 j.atEnd() == false; j++)
666 Peer *peer = j.getNode()->getValue();
667 peer->m_sendtime_accu -= (float)peer->m_num_sent /
668 peer->m_max_packets_per_second;
669 if(peer->m_sendtime_accu > 10. / peer->m_max_packets_per_second)
670 peer->m_sendtime_accu = 10. / peer->m_max_packets_per_second;
674 // Receive packets from the network and buffers and create ConnectionEvents
675 void Connection::receive()
677 u32 datasize = m_max_packet_size * 2; // Double it just to be safe
678 // TODO: We can not know how many layers of header there are.
679 // For now, just assume there are no other than the base headers.
680 u32 packet_maxsize = datasize + BASE_HEADER_SIZE;
681 SharedBuffer<u8> packetdata(packet_maxsize);
683 bool single_wait_done = false;
688 /* Check if some buffer has relevant data */
691 SharedBuffer<u8> resultdata;
692 bool got = getFromBuffers(peer_id, resultdata);
695 e.dataReceived(peer_id, resultdata);
701 if(single_wait_done){
702 if(m_socket.WaitData(0) == false)
706 single_wait_done = true;
709 s32 received_size = m_socket.Receive(sender, *packetdata, packet_maxsize);
711 if(received_size < 0)
713 if(received_size < BASE_HEADER_SIZE)
715 if(readU32(&packetdata[0]) != m_protocol_id)
718 u16 peer_id = readPeerId(*packetdata);
719 u8 channelnum = readChannel(*packetdata);
720 if(channelnum > CHANNEL_COUNT-1){
722 derr_con<<"Receive(): Invalid channel "<<channelnum<<std::endl;
723 throw InvalidIncomingDataException("Channel doesn't exist");
726 if(peer_id == PEER_ID_INEXISTENT)
729 Somebody is trying to send stuff to us with no peer id.
731 Check if the same address and port was added to our peer
733 Allow only entries that have has_sent_with_id==false.
736 core::map<u16, Peer*>::Iterator j;
737 j = m_peers.getIterator();
738 for(; j.atEnd() == false; j++)
740 Peer *peer = j.getNode()->getValue();
741 if(peer->has_sent_with_id)
743 if(peer->address == sender)
748 If no peer was found with the same address and port,
749 we shall assume it is a new peer and create an entry.
753 // Pass on to adding the peer
755 // Else: A peer was found.
758 Peer *peer = j.getNode()->getValue();
761 derr_con<<"WARNING: Assuming unknown peer to be "
762 <<"peer_id="<<peer_id<<std::endl;
767 The peer was not found in our lists. Add it.
769 if(peer_id == PEER_ID_INEXISTENT)
771 // Somebody wants to make a new connection
773 // Get a unique peer id (2 or higher)
776 Find an unused peer id
778 bool out_of_ids = false;
782 if(m_peers.find(peer_id_new) == NULL)
784 // Check for overflow
785 if(peer_id_new == 65535){
792 errorstream<<getDesc()<<" ran out of peer ids"<<std::endl;
797 dout_con<<"Receive(): Got a packet with peer_id=PEER_ID_INEXISTENT,"
798 " giving peer_id="<<peer_id_new<<std::endl;
801 Peer *peer = new Peer(peer_id_new, sender);
802 m_peers.insert(peer->id, peer);
804 // Create peer addition event
806 e.peerAdded(peer_id_new, sender);
809 // Create CONTROL packet to tell the peer id to the new peer.
810 SharedBuffer<u8> reply(4);
811 writeU8(&reply[0], TYPE_CONTROL);
812 writeU8(&reply[1], CONTROLTYPE_SET_PEER_ID);
813 writeU16(&reply[2], peer_id_new);
814 sendAsPacket(peer_id_new, 0, reply, true);
816 // We're now talking to a valid peer_id
817 peer_id = peer_id_new;
819 // Go on and process whatever it sent
822 core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
827 // This means that the peer id of the sender is not PEER_ID_INEXISTENT
828 // and it is invalid.
830 derr_con<<"Receive(): Peer not found"<<std::endl;
831 throw InvalidIncomingDataException("Peer not found (possible timeout)");
834 Peer *peer = node->getValue();
836 // Validate peer address
837 if(peer->address != sender)
840 derr_con<<"Peer "<<peer_id<<" sending from different address."
841 " Ignoring."<<std::endl;
845 peer->timeout_counter = 0.0;
847 Channel *channel = &(peer->channels[channelnum]);
849 // Throw the received packet to channel->processPacket()
851 // Make a new SharedBuffer from the data without the base headers
852 SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE);
853 memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE],
854 strippeddata.getSize());
857 // Process it (the result is some data with no headers made by us)
858 SharedBuffer<u8> resultdata = processPacket
859 (channel, strippeddata, peer_id, channelnum, false);
862 dout_con<<"ProcessPacket returned data of size "
863 <<resultdata.getSize()<<std::endl;
866 e.dataReceived(peer_id, resultdata);
869 }catch(ProcessedSilentlyException &e){
871 }catch(InvalidIncomingDataException &e){
873 catch(ProcessedSilentlyException &e){
878 void Connection::runTimeouts(float dtime)
880 core::list<u16> timeouted_peers;
881 core::map<u16, Peer*>::Iterator j;
882 j = m_peers.getIterator();
883 for(; j.atEnd() == false; j++)
885 Peer *peer = j.getNode()->getValue();
890 peer->timeout_counter += dtime;
891 if(peer->timeout_counter > m_timeout)
894 derr_con<<"RunTimeouts(): Peer "<<peer->id
896 <<" (source=peer->timeout_counter)"
898 // Add peer to the list
899 timeouted_peers.push_back(peer->id);
900 // Don't bother going through the buffers of this one
904 float resend_timeout = peer->resend_timeout;
905 for(u16 i=0; i<CHANNEL_COUNT; i++)
907 core::list<BufferedPacket> timed_outs;
908 core::list<BufferedPacket>::Iterator j;
910 Channel *channel = &peer->channels[i];
912 // Remove timed out incomplete unreliable split packets
913 channel->incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout);
915 // Increment reliable packet times
916 channel->outgoing_reliables.incrementTimeouts(dtime);
918 // Check reliable packet total times, remove peer if
920 if(channel->outgoing_reliables.anyTotaltimeReached(m_timeout))
923 derr_con<<"RunTimeouts(): Peer "<<peer->id
925 <<" (source=reliable packet totaltime)"
927 // Add peer to the to-be-removed list
928 timeouted_peers.push_back(peer->id);
932 // Re-send timed out outgoing reliables
934 timed_outs = channel->
935 outgoing_reliables.getTimedOuts(resend_timeout);
937 channel->outgoing_reliables.resetTimedOuts(resend_timeout);
939 j = timed_outs.begin();
940 for(; j != timed_outs.end(); j++)
942 u16 peer_id = readPeerId(*(j->data));
943 u8 channel = readChannel(*(j->data));
944 u16 seqnum = readU16(&(j->data[BASE_HEADER_SIZE+1]));
947 derr_con<<"RE-SENDING timed-out RELIABLE to ";
948 j->address.print(&derr_con);
949 derr_con<<"(t/o="<<resend_timeout<<"): "
950 <<"from_peer_id="<<peer_id
951 <<", channel="<<((int)channel&0xff)
952 <<", seqnum="<<seqnum
957 // Enlarge avg_rtt and resend_timeout:
958 // The rtt will be at least the timeout.
959 // NOTE: This won't affect the timeout of the next
960 // checked channel because it was cached.
961 peer->reportRTT(resend_timeout);
968 peer->ping_timer += dtime;
969 if(peer->ping_timer >= 5.0)
971 // Create and send PING packet
972 SharedBuffer<u8> data(2);
973 writeU8(&data[0], TYPE_CONTROL);
974 writeU8(&data[1], CONTROLTYPE_PING);
975 rawSendAsPacket(peer->id, 0, data, true);
977 peer->ping_timer = 0.0;
984 // Remove timed out peers
985 core::list<u16>::Iterator i = timeouted_peers.begin();
986 for(; i != timeouted_peers.end(); i++)
989 derr_con<<"RunTimeouts(): Removing peer "<<(*i)<<std::endl;
990 deletePeer(*i, true);
994 void Connection::serve(u16 port)
996 dout_con<<getDesc()<<" serving at port "<<port<<std::endl;
999 m_peer_id = PEER_ID_SERVER;
1001 catch(SocketException &e){
1009 void Connection::connect(Address address)
1011 dout_con<<getDesc()<<" connecting to "<<address.serializeString()
1012 <<":"<<address.getPort()<<std::endl;
1014 core::map<u16, Peer*>::Node *node = m_peers.find(PEER_ID_SERVER);
1016 throw ConnectionException("Already connected to a server");
1019 Peer *peer = new Peer(PEER_ID_SERVER, address);
1020 m_peers.insert(peer->id, peer);
1024 e.peerAdded(peer->id, peer->address);
1029 // Send a dummy packet to server with peer_id = PEER_ID_INEXISTENT
1030 m_peer_id = PEER_ID_INEXISTENT;
1031 SharedBuffer<u8> data(0);
1032 Send(PEER_ID_SERVER, 0, data, true);
1035 void Connection::disconnect()
1037 dout_con<<getDesc()<<" disconnecting"<<std::endl;
1039 // Create and send DISCO packet
1040 SharedBuffer<u8> data(2);
1041 writeU8(&data[0], TYPE_CONTROL);
1042 writeU8(&data[1], CONTROLTYPE_DISCO);
1045 core::map<u16, Peer*>::Iterator j;
1046 j = m_peers.getIterator();
1047 for(; j.atEnd() == false; j++)
1049 Peer *peer = j.getNode()->getValue();
1050 rawSendAsPacket(peer->id, 0, data, false);
1054 void Connection::sendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable)
1056 core::map<u16, Peer*>::Iterator j;
1057 j = m_peers.getIterator();
1058 for(; j.atEnd() == false; j++)
1060 Peer *peer = j.getNode()->getValue();
1061 send(peer->id, channelnum, data, reliable);
1065 void Connection::send(u16 peer_id, u8 channelnum,
1066 SharedBuffer<u8> data, bool reliable)
1068 dout_con<<getDesc()<<" sending to peer_id="<<peer_id<<std::endl;
1070 assert(channelnum < CHANNEL_COUNT);
1072 Peer *peer = getPeerNoEx(peer_id);
1075 Channel *channel = &(peer->channels[channelnum]);
1077 u32 chunksize_max = m_max_packet_size - BASE_HEADER_SIZE;
1079 chunksize_max -= RELIABLE_HEADER_SIZE;
1081 core::list<SharedBuffer<u8> > originals;
1082 originals = makeAutoSplitPacket(data, chunksize_max,
1083 channel->next_outgoing_split_seqnum);
1085 core::list<SharedBuffer<u8> >::Iterator i;
1086 i = originals.begin();
1087 for(; i != originals.end(); i++)
1089 SharedBuffer<u8> original = *i;
1091 sendAsPacket(peer_id, channelnum, original, reliable);
1095 void Connection::sendAsPacket(u16 peer_id, u8 channelnum,
1096 SharedBuffer<u8> data, bool reliable)
1098 OutgoingPacket packet(peer_id, channelnum, data, reliable);
1099 m_outgoing_queue.push_back(packet);
1102 void Connection::rawSendAsPacket(u16 peer_id, u8 channelnum,
1103 SharedBuffer<u8> data, bool reliable)
1105 Peer *peer = getPeerNoEx(peer_id);
1108 Channel *channel = &(peer->channels[channelnum]);
1112 u16 seqnum = channel->next_outgoing_seqnum;
1113 channel->next_outgoing_seqnum++;
1115 SharedBuffer<u8> reliable = makeReliablePacket(data, seqnum);
1117 // Add base headers and make a packet
1118 BufferedPacket p = makePacket(peer->address, reliable,
1119 m_protocol_id, m_peer_id, channelnum);
1122 // Buffer the packet
1123 channel->outgoing_reliables.insert(p);
1125 catch(AlreadyExistsException &e)
1127 PrintInfo(derr_con);
1128 derr_con<<"WARNING: Going to send a reliable packet "
1129 "seqnum="<<seqnum<<" that is already "
1130 "in outgoing buffer"<<std::endl;
1139 // Add base headers and make a packet
1140 BufferedPacket p = makePacket(peer->address, data,
1141 m_protocol_id, m_peer_id, channelnum);
1148 void Connection::rawSend(const BufferedPacket &packet)
1151 m_socket.Send(packet.address, *packet.data, packet.data.getSize());
1152 } catch(SendFailedException &e){
1153 derr_con<<"Connection::rawSend(): SendFailedException: "
1154 <<packet.address.serializeString()<<std::endl;
1158 Peer* Connection::getPeer(u16 peer_id)
1160 core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
1163 throw PeerNotFoundException("GetPeer: Peer not found (possible timeout)");
1167 assert(node->getValue()->id == peer_id);
1169 return node->getValue();
1172 Peer* Connection::getPeerNoEx(u16 peer_id)
1174 core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
1181 assert(node->getValue()->id == peer_id);
1183 return node->getValue();
1186 core::list<Peer*> Connection::getPeers()
1188 core::list<Peer*> list;
1189 core::map<u16, Peer*>::Iterator j;
1190 j = m_peers.getIterator();
1191 for(; j.atEnd() == false; j++)
1193 Peer *peer = j.getNode()->getValue();
1194 list.push_back(peer);
1199 bool Connection::getFromBuffers(u16 &peer_id, SharedBuffer<u8> &dst)
1201 core::map<u16, Peer*>::Iterator j;
1202 j = m_peers.getIterator();
1203 for(; j.atEnd() == false; j++)
1205 Peer *peer = j.getNode()->getValue();
1206 for(u16 i=0; i<CHANNEL_COUNT; i++)
1208 Channel *channel = &peer->channels[i];
1209 SharedBuffer<u8> resultdata;
1210 bool got = checkIncomingBuffers(channel, peer_id, resultdata);
1220 bool Connection::checkIncomingBuffers(Channel *channel, u16 &peer_id,
1221 SharedBuffer<u8> &dst)
1223 u16 firstseqnum = 0;
1224 // Clear old packets from start of buffer
1227 firstseqnum = channel->incoming_reliables.getFirstSeqnum();
1228 if(seqnum_higher(channel->next_incoming_seqnum, firstseqnum))
1229 channel->incoming_reliables.popFirst();
1233 // This happens if all packets are old
1234 }catch(con::NotFoundException)
1237 if(channel->incoming_reliables.empty() == false)
1239 if(firstseqnum == channel->next_incoming_seqnum)
1241 BufferedPacket p = channel->incoming_reliables.popFirst();
1243 peer_id = readPeerId(*p.data);
1244 u8 channelnum = readChannel(*p.data);
1245 u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
1248 dout_con<<"UNBUFFERING TYPE_RELIABLE"
1249 <<" seqnum="<<seqnum
1250 <<" peer_id="<<peer_id
1251 <<" channel="<<((int)channelnum&0xff)
1254 channel->next_incoming_seqnum++;
1256 u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
1257 // Get out the inside packet and re-process it
1258 SharedBuffer<u8> payload(p.data.getSize() - headers_size);
1259 memcpy(*payload, &p.data[headers_size], payload.getSize());
1261 dst = processPacket(channel, payload, peer_id, channelnum, true);
1268 SharedBuffer<u8> Connection::processPacket(Channel *channel,
1269 SharedBuffer<u8> packetdata, u16 peer_id,
1270 u8 channelnum, bool reliable)
1272 IndentationRaiser iraiser(&(m_indentation));
1274 if(packetdata.getSize() < 1)
1275 throw InvalidIncomingDataException("packetdata.getSize() < 1");
1277 u8 type = readU8(&packetdata[0]);
1279 if(type == TYPE_CONTROL)
1281 if(packetdata.getSize() < 2)
1282 throw InvalidIncomingDataException("packetdata.getSize() < 2");
1284 u8 controltype = readU8(&packetdata[1]);
1286 if(controltype == CONTROLTYPE_ACK)
1288 if(packetdata.getSize() < 4)
1289 throw InvalidIncomingDataException
1290 ("packetdata.getSize() < 4 (ACK header size)");
1292 u16 seqnum = readU16(&packetdata[2]);
1294 dout_con<<"Got CONTROLTYPE_ACK: channelnum="
1295 <<((int)channelnum&0xff)<<", peer_id="<<peer_id
1296 <<", seqnum="<<seqnum<<std::endl;
1299 BufferedPacket p = channel->outgoing_reliables.popSeqnum(seqnum);
1300 // Get round trip time
1301 float rtt = p.totaltime;
1303 // Let peer calculate stuff according to it
1304 // (avg_rtt and resend_timeout)
1305 Peer *peer = getPeer(peer_id);
1306 peer->reportRTT(rtt);
1308 //PrintInfo(dout_con);
1309 //dout_con<<"RTT = "<<rtt<<std::endl;
1311 /*dout_con<<"OUTGOING: ";
1313 channel->outgoing_reliables.print();
1314 dout_con<<std::endl;*/
1316 catch(NotFoundException &e){
1317 PrintInfo(derr_con);
1318 derr_con<<"WARNING: ACKed packet not "
1323 throw ProcessedSilentlyException("Got an ACK");
1325 else if(controltype == CONTROLTYPE_SET_PEER_ID)
1327 if(packetdata.getSize() < 4)
1328 throw InvalidIncomingDataException
1329 ("packetdata.getSize() < 4 (SET_PEER_ID header size)");
1330 u16 peer_id_new = readU16(&packetdata[2]);
1332 dout_con<<"Got new peer id: "<<peer_id_new<<"... "<<std::endl;
1334 if(GetPeerID() != PEER_ID_INEXISTENT)
1336 PrintInfo(derr_con);
1337 derr_con<<"WARNING: Not changing"
1338 " existing peer id."<<std::endl;
1342 dout_con<<"changing."<<std::endl;
1343 SetPeerID(peer_id_new);
1345 throw ProcessedSilentlyException("Got a SET_PEER_ID");
1347 else if(controltype == CONTROLTYPE_PING)
1349 // Just ignore it, the incoming data already reset
1350 // the timeout counter
1352 dout_con<<"PING"<<std::endl;
1353 throw ProcessedSilentlyException("Got a PING");
1355 else if(controltype == CONTROLTYPE_DISCO)
1357 // Just ignore it, the incoming data already reset
1358 // the timeout counter
1360 dout_con<<"DISCO: Removing peer "<<(peer_id)<<std::endl;
1362 if(deletePeer(peer_id, false) == false)
1364 PrintInfo(derr_con);
1365 derr_con<<"DISCO: Peer not found"<<std::endl;
1368 throw ProcessedSilentlyException("Got a DISCO");
1371 PrintInfo(derr_con);
1372 derr_con<<"INVALID TYPE_CONTROL: invalid controltype="
1373 <<((int)controltype&0xff)<<std::endl;
1374 throw InvalidIncomingDataException("Invalid control type");
1377 else if(type == TYPE_ORIGINAL)
1379 if(packetdata.getSize() < ORIGINAL_HEADER_SIZE)
1380 throw InvalidIncomingDataException
1381 ("packetdata.getSize() < ORIGINAL_HEADER_SIZE");
1383 dout_con<<"RETURNING TYPE_ORIGINAL to user"
1385 // Get the inside packet out and return it
1386 SharedBuffer<u8> payload(packetdata.getSize() - ORIGINAL_HEADER_SIZE);
1387 memcpy(*payload, &packetdata[ORIGINAL_HEADER_SIZE], payload.getSize());
1390 else if(type == TYPE_SPLIT)
1392 // We have to create a packet again for buffering
1393 // This isn't actually too bad an idea.
1394 BufferedPacket packet = makePacket(
1395 getPeer(peer_id)->address,
1400 // Buffer the packet
1401 SharedBuffer<u8> data = channel->incoming_splits.insert(packet, reliable);
1402 if(data.getSize() != 0)
1405 dout_con<<"RETURNING TYPE_SPLIT: Constructed full data, "
1406 <<"size="<<data.getSize()<<std::endl;
1410 dout_con<<"BUFFERED TYPE_SPLIT"<<std::endl;
1411 throw ProcessedSilentlyException("Buffered a split packet chunk");
1413 else if(type == TYPE_RELIABLE)
1415 // Recursive reliable packets not allowed
1416 assert(reliable == false);
1418 if(packetdata.getSize() < RELIABLE_HEADER_SIZE)
1419 throw InvalidIncomingDataException
1420 ("packetdata.getSize() < RELIABLE_HEADER_SIZE");
1422 u16 seqnum = readU16(&packetdata[1]);
1424 bool is_future_packet = seqnum_higher(seqnum, channel->next_incoming_seqnum);
1425 bool is_old_packet = seqnum_higher(channel->next_incoming_seqnum, seqnum);
1428 if(is_future_packet)
1429 dout_con<<"BUFFERING";
1430 else if(is_old_packet)
1434 dout_con<<" TYPE_RELIABLE seqnum="<<seqnum
1435 <<" next="<<channel->next_incoming_seqnum;
1436 dout_con<<" [sending CONTROLTYPE_ACK"
1437 " to peer_id="<<peer_id<<"]";
1438 dout_con<<std::endl;
1441 //assert(channel->incoming_reliables.size() < 100);
1443 // Send a CONTROLTYPE_ACK
1444 SharedBuffer<u8> reply(4);
1445 writeU8(&reply[0], TYPE_CONTROL);
1446 writeU8(&reply[1], CONTROLTYPE_ACK);
1447 writeU16(&reply[2], seqnum);
1448 rawSendAsPacket(peer_id, channelnum, reply, false);
1450 //if(seqnum_higher(seqnum, channel->next_incoming_seqnum))
1451 if(is_future_packet)
1454 dout_con<<"Buffering reliable packet (seqnum="
1455 <<seqnum<<")"<<std::endl;*/
1457 // This one comes later, buffer it.
1458 // Actually we have to make a packet to buffer one.
1459 // Well, we have all the ingredients, so just do it.
1460 BufferedPacket packet = makePacket(
1461 getPeer(peer_id)->address,
1467 channel->incoming_reliables.insert(packet);
1470 dout_con<<"INCOMING: ";
1471 channel->incoming_reliables.print();
1472 dout_con<<std::endl;*/
1474 catch(AlreadyExistsException &e)
1478 throw ProcessedSilentlyException("Buffered future reliable packet");
1480 //else if(seqnum_higher(channel->next_incoming_seqnum, seqnum))
1481 else if(is_old_packet)
1483 // An old packet, dump it
1484 throw InvalidIncomingDataException("Got an old reliable packet");
1487 channel->next_incoming_seqnum++;
1489 // Get out the inside packet and re-process it
1490 SharedBuffer<u8> payload(packetdata.getSize() - RELIABLE_HEADER_SIZE);
1491 memcpy(*payload, &packetdata[RELIABLE_HEADER_SIZE], payload.getSize());
1493 return processPacket(channel, payload, peer_id, channelnum, true);
1497 PrintInfo(derr_con);
1498 derr_con<<"Got invalid type="<<((int)type&0xff)<<std::endl;
1499 throw InvalidIncomingDataException("Invalid packet type");
1502 // We should never get here.
1503 // If you get here, add an exception or a return to some of the
1504 // above conditionals.
1506 throw BaseException("Error in Channel::ProcessPacket()");
1509 bool Connection::deletePeer(u16 peer_id, bool timeout)
1511 if(m_peers.find(peer_id) == NULL)
1514 Peer *peer = m_peers[peer_id];
1518 e.peerRemoved(peer_id, timeout, peer->address);
1521 delete m_peers[peer_id];
1522 m_peers.remove(peer_id);
1528 ConnectionEvent Connection::getEvent()
1530 if(m_event_queue.size() == 0){
1532 e.type = CONNEVENT_NONE;
1535 return m_event_queue.pop_front();
1538 ConnectionEvent Connection::waitEvent(u32 timeout_ms)
1541 return m_event_queue.pop_front(timeout_ms);
1542 } catch(ItemNotFoundException &ex){
1544 e.type = CONNEVENT_NONE;
1549 void Connection::putCommand(ConnectionCommand &c)
1551 m_command_queue.push_back(c);
1554 void Connection::Serve(unsigned short port)
1556 ConnectionCommand c;
1561 void Connection::Connect(Address address)
1563 ConnectionCommand c;
1568 bool Connection::Connected()
1570 JMutexAutoLock peerlock(m_peers_mutex);
1572 if(m_peers.size() != 1)
1575 core::map<u16, Peer*>::Node *node = m_peers.find(PEER_ID_SERVER);
1579 if(m_peer_id == PEER_ID_INEXISTENT)
1585 void Connection::Disconnect()
1587 ConnectionCommand c;
1592 u32 Connection::Receive(u16 &peer_id, SharedBuffer<u8> &data)
1595 ConnectionEvent e = waitEvent(m_bc_receive_timeout);
1596 if(e.type != CONNEVENT_NONE)
1597 dout_con<<getDesc()<<": Receive: got event: "
1598 <<e.describe()<<std::endl;
1600 case CONNEVENT_NONE:
1601 throw NoIncomingDataException("No incoming data");
1602 case CONNEVENT_DATA_RECEIVED:
1603 peer_id = e.peer_id;
1604 data = SharedBuffer<u8>(e.data);
1605 return e.data.getSize();
1606 case CONNEVENT_PEER_ADDED: {
1607 Peer tmp(e.peer_id, e.address);
1608 if(m_bc_peerhandler)
1609 m_bc_peerhandler->peerAdded(&tmp);
1611 case CONNEVENT_PEER_REMOVED: {
1612 Peer tmp(e.peer_id, e.address);
1613 if(m_bc_peerhandler)
1614 m_bc_peerhandler->deletingPeer(&tmp, e.timeout);
1616 case CONNEVENT_BIND_FAILED:
1617 throw ConnectionBindFailed("Failed to bind socket "
1618 "(port already in use?)");
1621 throw NoIncomingDataException("No incoming data");
1624 void Connection::SendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable)
1626 assert(channelnum < CHANNEL_COUNT);
1628 ConnectionCommand c;
1629 c.sendToAll(channelnum, data, reliable);
1633 void Connection::Send(u16 peer_id, u8 channelnum,
1634 SharedBuffer<u8> data, bool reliable)
1636 assert(channelnum < CHANNEL_COUNT);
1638 ConnectionCommand c;
1639 c.send(peer_id, channelnum, data, reliable);
1643 void Connection::RunTimeouts(float dtime)
1648 Address Connection::GetPeerAddress(u16 peer_id)
1650 JMutexAutoLock peerlock(m_peers_mutex);
1651 return getPeer(peer_id)->address;
1654 float Connection::GetPeerAvgRTT(u16 peer_id)
1656 JMutexAutoLock peerlock(m_peers_mutex);
1657 return getPeer(peer_id)->avg_rtt;
1660 void Connection::DeletePeer(u16 peer_id)
1662 ConnectionCommand c;
1663 c.deletePeer(peer_id);
1667 void Connection::PrintInfo(std::ostream &out)
1669 out<<getDesc()<<": ";
1672 void Connection::PrintInfo()
1674 PrintInfo(dout_con);
1677 std::string Connection::getDesc()
1679 return std::string("con(")+itos(m_socket.GetHandle())+"/"+itos(m_peer_id)+")";