3 Copyright (C) 2010 celeron55, Perttu Ahola <celeron55@gmail.com>
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License along
16 with this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 #include "connection.h"
22 #include "serialization.h"
29 BufferedPacket makePacket(Address &address, u8 *data, u32 datasize,
30 u32 protocol_id, u16 sender_peer_id, u8 channel)
32 u32 packet_size = datasize + BASE_HEADER_SIZE;
33 BufferedPacket p(packet_size);
36 writeU32(&p.data[0], protocol_id);
37 writeU16(&p.data[4], sender_peer_id);
38 writeU8(&p.data[6], channel);
40 memcpy(&p.data[BASE_HEADER_SIZE], data, datasize);
45 BufferedPacket makePacket(Address &address, SharedBuffer<u8> &data,
46 u32 protocol_id, u16 sender_peer_id, u8 channel)
48 return makePacket(address, *data, data.getSize(),
49 protocol_id, sender_peer_id, channel);
52 SharedBuffer<u8> makeOriginalPacket(
53 SharedBuffer<u8> data)
56 u32 packet_size = data.getSize() + header_size;
57 SharedBuffer<u8> b(packet_size);
59 writeU8(&b[0], TYPE_ORIGINAL);
61 memcpy(&b[header_size], *data, data.getSize());
66 core::list<SharedBuffer<u8> > makeSplitPacket(
67 SharedBuffer<u8> data,
71 // Chunk packets, containing the TYPE_SPLIT header
72 core::list<SharedBuffer<u8> > chunks;
74 u32 chunk_header_size = 7;
75 u32 maximum_data_size = chunksize_max - chunk_header_size;
80 end = start + maximum_data_size - 1;
81 if(end > data.getSize() - 1)
82 end = data.getSize() - 1;
84 u32 payload_size = end - start + 1;
85 u32 packet_size = chunk_header_size + payload_size;
87 SharedBuffer<u8> chunk(packet_size);
89 writeU8(&chunk[0], TYPE_SPLIT);
90 writeU16(&chunk[1], seqnum);
91 // [3] u16 chunk_count is written at next stage
92 writeU16(&chunk[5], chunk_num);
93 memcpy(&chunk[chunk_header_size], &data[start], payload_size);
95 chunks.push_back(chunk);
100 while(end != data.getSize() - 1);
102 u16 chunk_count = chunks.getSize();
104 core::list<SharedBuffer<u8> >::Iterator i = chunks.begin();
105 for(; i != chunks.end(); i++)
108 writeU16(&((*i)[3]), chunk_count);
114 core::list<SharedBuffer<u8> > makeAutoSplitPacket(
115 SharedBuffer<u8> data,
119 u32 original_header_size = 1;
120 core::list<SharedBuffer<u8> > list;
121 if(data.getSize() + original_header_size > chunksize_max)
123 list = makeSplitPacket(data, chunksize_max, split_seqnum);
129 list.push_back(makeOriginalPacket(data));
134 SharedBuffer<u8> makeReliablePacket(
135 SharedBuffer<u8> data,
138 /*dstream<<"BEGIN SharedBuffer<u8> makeReliablePacket()"<<std::endl;
139 dstream<<"data.getSize()="<<data.getSize()<<", data[0]="
140 <<((unsigned int)data[0]&0xff)<<std::endl;*/
142 u32 packet_size = data.getSize() + header_size;
143 SharedBuffer<u8> b(packet_size);
145 writeU8(&b[0], TYPE_RELIABLE);
146 writeU16(&b[1], seqnum);
148 memcpy(&b[header_size], *data, data.getSize());
150 /*dstream<<"data.getSize()="<<data.getSize()<<", data[0]="
151 <<((unsigned int)data[0]&0xff)<<std::endl;*/
152 //dstream<<"END SharedBuffer<u8> makeReliablePacket()"<<std::endl;
160 void ReliablePacketBuffer::print()
162 core::list<BufferedPacket>::Iterator i;
164 for(; i != m_list.end(); i++)
166 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
170 bool ReliablePacketBuffer::empty()
172 return m_list.empty();
174 u32 ReliablePacketBuffer::size()
176 return m_list.getSize();
178 RPBSearchResult ReliablePacketBuffer::findPacket(u16 seqnum)
180 core::list<BufferedPacket>::Iterator i;
182 for(; i != m_list.end(); i++)
184 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
185 /*dout_con<<"findPacket(): finding seqnum="<<seqnum
186 <<", comparing to s="<<s<<std::endl;*/
192 RPBSearchResult ReliablePacketBuffer::notFound()
196 u16 ReliablePacketBuffer::getFirstSeqnum()
199 throw NotFoundException("Buffer is empty");
200 BufferedPacket p = *m_list.begin();
201 return readU16(&p.data[BASE_HEADER_SIZE+1]);
203 BufferedPacket ReliablePacketBuffer::popFirst()
206 throw NotFoundException("Buffer is empty");
207 BufferedPacket p = *m_list.begin();
208 core::list<BufferedPacket>::Iterator i = m_list.begin();
212 BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum)
214 RPBSearchResult r = findPacket(seqnum);
216 dout_con<<"Not found"<<std::endl;
217 throw NotFoundException("seqnum not found in buffer");
219 BufferedPacket p = *r;
223 void ReliablePacketBuffer::insert(BufferedPacket &p)
225 assert(p.data.getSize() >= BASE_HEADER_SIZE+3);
226 u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
227 assert(type == TYPE_RELIABLE);
228 u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
230 // Find the right place for the packet and insert it there
232 // If list is empty, just add it
239 // Otherwise find the right place
240 core::list<BufferedPacket>::Iterator i;
242 // Find the first packet in the list which has a higher seqnum
243 for(; i != m_list.end(); i++){
244 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
246 throw AlreadyExistsException("Same seqnum in list");
248 if(seqnum_higher(s, seqnum)){
252 // If we're at the end of the list, add the packet to the
254 if(i == m_list.end())
261 m_list.insert_before(i, p);
264 void ReliablePacketBuffer::incrementTimeouts(float dtime)
266 core::list<BufferedPacket>::Iterator i;
268 for(; i != m_list.end(); i++){
270 i->totaltime += dtime;
274 void ReliablePacketBuffer::resetTimedOuts(float timeout)
276 core::list<BufferedPacket>::Iterator i;
278 for(; i != m_list.end(); i++){
279 if(i->time >= timeout)
284 bool ReliablePacketBuffer::anyTotaltimeReached(float timeout)
286 core::list<BufferedPacket>::Iterator i;
288 for(; i != m_list.end(); i++){
289 if(i->totaltime >= timeout)
295 core::list<BufferedPacket> ReliablePacketBuffer::getTimedOuts(float timeout)
297 core::list<BufferedPacket> timed_outs;
298 core::list<BufferedPacket>::Iterator i;
300 for(; i != m_list.end(); i++)
302 if(i->time >= timeout)
303 timed_outs.push_back(*i);
312 IncomingSplitBuffer::~IncomingSplitBuffer()
314 core::map<u16, IncomingSplitPacket*>::Iterator i;
315 i = m_buf.getIterator();
316 for(; i.atEnd() == false; i++)
318 delete i.getNode()->getValue();
322 This will throw a GotSplitPacketException when a full
323 split packet is constructed.
325 SharedBuffer<u8> IncomingSplitBuffer::insert(BufferedPacket &p, bool reliable)
327 u32 headersize = BASE_HEADER_SIZE + 7;
328 assert(p.data.getSize() >= headersize);
329 u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
330 assert(type == TYPE_SPLIT);
331 u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
332 u16 chunk_count = readU16(&p.data[BASE_HEADER_SIZE+3]);
333 u16 chunk_num = readU16(&p.data[BASE_HEADER_SIZE+5]);
335 // Add if doesn't exist
336 if(m_buf.find(seqnum) == NULL)
338 IncomingSplitPacket *sp = new IncomingSplitPacket();
339 sp->chunk_count = chunk_count;
340 sp->reliable = reliable;
344 IncomingSplitPacket *sp = m_buf[seqnum];
346 // TODO: These errors should be thrown or something? Dunno.
347 if(chunk_count != sp->chunk_count)
348 derr_con<<"Connection: WARNING: chunk_count="<<chunk_count
349 <<" != sp->chunk_count="<<sp->chunk_count
351 if(reliable != sp->reliable)
352 derr_con<<"Connection: WARNING: reliable="<<reliable
353 <<" != sp->reliable="<<sp->reliable
356 // If chunk already exists, cancel
357 if(sp->chunks.find(chunk_num) != NULL)
358 throw AlreadyExistsException("Chunk already in buffer");
360 // Cut chunk data out of packet
361 u32 chunkdatasize = p.data.getSize() - headersize;
362 SharedBuffer<u8> chunkdata(chunkdatasize);
363 memcpy(*chunkdata, &(p.data[headersize]), chunkdatasize);
365 // Set chunk data in buffer
366 sp->chunks[chunk_num] = chunkdata;
368 // If not all chunks are received, return empty buffer
369 if(sp->allReceived() == false)
370 return SharedBuffer<u8>();
372 // Calculate total size
374 core::map<u16, SharedBuffer<u8> >::Iterator i;
375 i = sp->chunks.getIterator();
376 for(; i.atEnd() == false; i++)
378 totalsize += i.getNode()->getValue().getSize();
381 SharedBuffer<u8> fulldata(totalsize);
383 // Copy chunks to data buffer
385 for(u32 chunk_i=0; chunk_i<sp->chunk_count;
388 SharedBuffer<u8> buf = sp->chunks[chunk_i];
389 u16 chunkdatasize = buf.getSize();
390 memcpy(&fulldata[start], *buf, chunkdatasize);
391 start += chunkdatasize;;
394 // Remove sp from buffer
395 m_buf.remove(seqnum);
400 void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout)
402 core::list<u16> remove_queue;
403 core::map<u16, IncomingSplitPacket*>::Iterator i;
404 i = m_buf.getIterator();
405 for(; i.atEnd() == false; i++)
407 IncomingSplitPacket *p = i.getNode()->getValue();
408 // Reliable ones are not removed by timeout
409 if(p->reliable == true)
412 if(p->time >= timeout)
413 remove_queue.push_back(i.getNode()->getKey());
415 core::list<u16>::Iterator j;
416 j = remove_queue.begin();
417 for(; j != remove_queue.end(); j++)
419 dout_con<<"NOTE: Removing timed out unreliable split packet"
432 next_outgoing_seqnum = SEQNUM_INITIAL;
433 next_incoming_seqnum = SEQNUM_INITIAL;
434 next_outgoing_split_seqnum = SEQNUM_INITIAL;
444 Peer::Peer(u16 a_id, Address a_address):
447 timeout_counter(0.0),
451 has_sent_with_id(false),
453 m_max_packets_per_second(10),
462 void Peer::reportRTT(float rtt)
466 if(m_max_packets_per_second < 100)
467 m_max_packets_per_second += 10;
468 } else if(rtt < 0.2){
469 if(m_max_packets_per_second < 100)
470 m_max_packets_per_second += 2;
472 m_max_packets_per_second *= 0.8;
473 if(m_max_packets_per_second < 10)
474 m_max_packets_per_second = 10;
480 else if(avg_rtt < 0.0)
483 avg_rtt = rtt * 0.1 + avg_rtt * 0.9;
485 // Calculate resend_timeout
487 /*int reliable_count = 0;
488 for(int i=0; i<CHANNEL_COUNT; i++)
490 reliable_count += channels[i].outgoing_reliables.size();
492 float timeout = avg_rtt * RESEND_TIMEOUT_FACTOR
493 * ((float)reliable_count * 1);*/
495 float timeout = avg_rtt * RESEND_TIMEOUT_FACTOR;
496 if(timeout < RESEND_TIMEOUT_MIN)
497 timeout = RESEND_TIMEOUT_MIN;
498 if(timeout > RESEND_TIMEOUT_MAX)
499 timeout = RESEND_TIMEOUT_MAX;
500 resend_timeout = timeout;
507 Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout):
508 m_protocol_id(protocol_id),
509 m_max_packet_size(max_packet_size),
512 m_bc_peerhandler(NULL),
513 m_bc_receive_timeout(0),
516 m_socket.setTimeoutMs(5);
521 Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout,
522 PeerHandler *peerhandler):
523 m_protocol_id(protocol_id),
524 m_max_packet_size(max_packet_size),
527 m_bc_peerhandler(peerhandler),
528 m_bc_receive_timeout(0),
531 m_socket.setTimeoutMs(5);
537 Connection::~Connection()
544 void * Connection::Thread()
547 log_register_thread("Connection");
549 dout_con<<"Connection thread started"<<std::endl;
551 u32 curtime = porting::getTimeMs();
552 u32 lasttime = curtime;
556 BEGIN_DEBUG_EXCEPTION_HANDLER
559 curtime = porting::getTimeMs();
560 float dtime = (float)(curtime - lasttime) / 1000.;
568 while(m_command_queue.size() != 0){
569 ConnectionCommand c = m_command_queue.pop_front();
577 END_DEBUG_EXCEPTION_HANDLER(derr_con);
583 void Connection::putEvent(ConnectionEvent &e)
585 assert(e.type != CONNEVENT_NONE);
586 m_event_queue.push_back(e);
589 void Connection::processCommand(ConnectionCommand &c)
593 dout_con<<getDesc()<<" processing CONNCMD_NONE"<<std::endl;
596 dout_con<<getDesc()<<" processing CONNCMD_SERVE port="
600 case CONNCMD_CONNECT:
601 dout_con<<getDesc()<<" processing CONNCMD_CONNECT"<<std::endl;
604 case CONNCMD_DISCONNECT:
605 dout_con<<getDesc()<<" processing CONNCMD_DISCONNECT"<<std::endl;
609 dout_con<<getDesc()<<" processing CONNCMD_SEND"<<std::endl;
610 send(c.peer_id, c.channelnum, c.data, c.reliable);
612 case CONNCMD_SEND_TO_ALL:
613 dout_con<<getDesc()<<" processing CONNCMD_SEND_TO_ALL"<<std::endl;
614 sendToAll(c.channelnum, c.data, c.reliable);
616 case CONNCMD_DELETE_PEER:
617 dout_con<<getDesc()<<" processing CONNCMD_DELETE_PEER"<<std::endl;
618 deletePeer(c.peer_id, false);
623 void Connection::send(float dtime)
625 for(core::map<u16, Peer*>::Iterator
626 j = m_peers.getIterator();
627 j.atEnd() == false; j++)
629 Peer *peer = j.getNode()->getValue();
630 peer->m_sendtime_accu += dtime;
631 peer->m_num_sent = 0;
632 peer->m_max_num_sent = peer->m_sendtime_accu *
633 peer->m_max_packets_per_second;
635 Queue<OutgoingPacket> postponed_packets;
636 while(m_outgoing_queue.size() != 0){
637 OutgoingPacket packet = m_outgoing_queue.pop_front();
638 Peer *peer = getPeerNoEx(packet.peer_id);
641 if(peer->channels[packet.channelnum].outgoing_reliables.size() >= 5){
642 postponed_packets.push_back(packet);
643 } else if(peer->m_num_sent < peer->m_max_num_sent){
644 rawSendAsPacket(packet.peer_id, packet.channelnum,
645 packet.data, packet.reliable);
648 postponed_packets.push_back(packet);
651 while(postponed_packets.size() != 0){
652 m_outgoing_queue.push_back(postponed_packets.pop_front());
654 for(core::map<u16, Peer*>::Iterator
655 j = m_peers.getIterator();
656 j.atEnd() == false; j++)
658 Peer *peer = j.getNode()->getValue();
659 peer->m_sendtime_accu -= (float)peer->m_num_sent /
660 peer->m_max_packets_per_second;
661 if(peer->m_sendtime_accu > 10. / peer->m_max_packets_per_second)
662 peer->m_sendtime_accu = 10. / peer->m_max_packets_per_second;
666 // Receive packets from the network and buffers and create ConnectionEvents
667 void Connection::receive()
669 u32 datasize = m_max_packet_size * 2; // Double it just to be safe
670 // TODO: We can not know how many layers of header there are.
671 // For now, just assume there are no other than the base headers.
672 u32 packet_maxsize = datasize + BASE_HEADER_SIZE;
673 SharedBuffer<u8> packetdata(packet_maxsize);
675 bool single_wait_done = false;
680 /* Check if some buffer has relevant data */
683 SharedBuffer<u8> resultdata;
684 bool got = getFromBuffers(peer_id, resultdata);
687 e.dataReceived(peer_id, resultdata);
693 if(single_wait_done){
694 if(m_socket.WaitData(0) == false)
698 single_wait_done = true;
701 s32 received_size = m_socket.Receive(sender, *packetdata, packet_maxsize);
703 if(received_size < 0)
705 if(received_size < BASE_HEADER_SIZE)
707 if(readU32(&packetdata[0]) != m_protocol_id)
710 u16 peer_id = readPeerId(*packetdata);
711 u8 channelnum = readChannel(*packetdata);
712 if(channelnum > CHANNEL_COUNT-1){
714 derr_con<<"Receive(): Invalid channel "<<channelnum<<std::endl;
715 throw InvalidIncomingDataException("Channel doesn't exist");
718 if(peer_id == PEER_ID_INEXISTENT)
721 Somebody is trying to send stuff to us with no peer id.
723 Check if the same address and port was added to our peer
725 Allow only entries that have has_sent_with_id==false.
728 core::map<u16, Peer*>::Iterator j;
729 j = m_peers.getIterator();
730 for(; j.atEnd() == false; j++)
732 Peer *peer = j.getNode()->getValue();
733 if(peer->has_sent_with_id)
735 if(peer->address == sender)
740 If no peer was found with the same address and port,
741 we shall assume it is a new peer and create an entry.
745 // Pass on to adding the peer
747 // Else: A peer was found.
750 Peer *peer = j.getNode()->getValue();
753 derr_con<<"WARNING: Assuming unknown peer to be "
754 <<"peer_id="<<peer_id<<std::endl;
759 The peer was not found in our lists. Add it.
761 if(peer_id == PEER_ID_INEXISTENT)
763 // Somebody wants to make a new connection
765 // Get a unique peer id (2 or higher)
768 Find an unused peer id
770 bool out_of_ids = false;
774 if(m_peers.find(peer_id_new) == NULL)
776 // Check for overflow
777 if(peer_id_new == 65535){
784 errorstream<<getDesc()<<" ran out of peer ids"<<std::endl;
789 dout_con<<"Receive(): Got a packet with peer_id=PEER_ID_INEXISTENT,"
790 " giving peer_id="<<peer_id_new<<std::endl;
793 Peer *peer = new Peer(peer_id_new, sender);
794 m_peers.insert(peer->id, peer);
796 // Create peer addition event
798 e.peerAdded(peer_id_new, sender);
801 // Create CONTROL packet to tell the peer id to the new peer.
802 SharedBuffer<u8> reply(4);
803 writeU8(&reply[0], TYPE_CONTROL);
804 writeU8(&reply[1], CONTROLTYPE_SET_PEER_ID);
805 writeU16(&reply[2], peer_id_new);
806 sendAsPacket(peer_id_new, 0, reply, true);
808 // We're now talking to a valid peer_id
809 peer_id = peer_id_new;
811 // Go on and process whatever it sent
814 core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
819 // This means that the peer id of the sender is not PEER_ID_INEXISTENT
820 // and it is invalid.
822 derr_con<<"Receive(): Peer not found"<<std::endl;
823 throw InvalidIncomingDataException("Peer not found (possible timeout)");
826 Peer *peer = node->getValue();
828 // Validate peer address
829 if(peer->address != sender)
832 derr_con<<"Peer "<<peer_id<<" sending from different address."
833 " Ignoring."<<std::endl;
837 peer->timeout_counter = 0.0;
839 Channel *channel = &(peer->channels[channelnum]);
841 // Throw the received packet to channel->processPacket()
843 // Make a new SharedBuffer from the data without the base headers
844 SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE);
845 memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE],
846 strippeddata.getSize());
849 // Process it (the result is some data with no headers made by us)
850 SharedBuffer<u8> resultdata = processPacket
851 (channel, strippeddata, peer_id, channelnum, false);
854 dout_con<<"ProcessPacket returned data of size "
855 <<resultdata.getSize()<<std::endl;
858 e.dataReceived(peer_id, resultdata);
861 }catch(ProcessedSilentlyException &e){
863 }catch(InvalidIncomingDataException &e){
865 catch(ProcessedSilentlyException &e){
870 void Connection::runTimeouts(float dtime)
872 core::list<u16> timeouted_peers;
873 core::map<u16, Peer*>::Iterator j;
874 j = m_peers.getIterator();
875 for(; j.atEnd() == false; j++)
877 Peer *peer = j.getNode()->getValue();
882 peer->timeout_counter += dtime;
883 if(peer->timeout_counter > m_timeout)
886 derr_con<<"RunTimeouts(): Peer "<<peer->id
888 <<" (source=peer->timeout_counter)"
890 // Add peer to the list
891 timeouted_peers.push_back(peer->id);
892 // Don't bother going through the buffers of this one
896 float resend_timeout = peer->resend_timeout;
897 for(u16 i=0; i<CHANNEL_COUNT; i++)
899 core::list<BufferedPacket> timed_outs;
900 core::list<BufferedPacket>::Iterator j;
902 Channel *channel = &peer->channels[i];
904 // Remove timed out incomplete unreliable split packets
905 channel->incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout);
907 // Increment reliable packet times
908 channel->outgoing_reliables.incrementTimeouts(dtime);
910 // Check reliable packet total times, remove peer if
912 if(channel->outgoing_reliables.anyTotaltimeReached(m_timeout))
915 derr_con<<"RunTimeouts(): Peer "<<peer->id
917 <<" (source=reliable packet totaltime)"
919 // Add peer to the to-be-removed list
920 timeouted_peers.push_back(peer->id);
924 // Re-send timed out outgoing reliables
926 timed_outs = channel->
927 outgoing_reliables.getTimedOuts(resend_timeout);
929 channel->outgoing_reliables.resetTimedOuts(resend_timeout);
931 j = timed_outs.begin();
932 for(; j != timed_outs.end(); j++)
934 u16 peer_id = readPeerId(*(j->data));
935 u8 channel = readChannel(*(j->data));
936 u16 seqnum = readU16(&(j->data[BASE_HEADER_SIZE+1]));
939 derr_con<<"RE-SENDING timed-out RELIABLE to ";
940 j->address.print(&derr_con);
941 derr_con<<"(t/o="<<resend_timeout<<"): "
942 <<"from_peer_id="<<peer_id
943 <<", channel="<<((int)channel&0xff)
944 <<", seqnum="<<seqnum
949 // Enlarge avg_rtt and resend_timeout:
950 // The rtt will be at least the timeout.
951 // NOTE: This won't affect the timeout of the next
952 // checked channel because it was cached.
953 peer->reportRTT(resend_timeout);
960 peer->ping_timer += dtime;
961 if(peer->ping_timer >= 5.0)
963 // Create and send PING packet
964 SharedBuffer<u8> data(2);
965 writeU8(&data[0], TYPE_CONTROL);
966 writeU8(&data[1], CONTROLTYPE_PING);
967 rawSendAsPacket(peer->id, 0, data, true);
969 peer->ping_timer = 0.0;
976 // Remove timed out peers
977 core::list<u16>::Iterator i = timeouted_peers.begin();
978 for(; i != timeouted_peers.end(); i++)
981 derr_con<<"RunTimeouts(): Removing peer "<<(*i)<<std::endl;
982 deletePeer(*i, true);
986 void Connection::serve(u16 port)
988 dout_con<<getDesc()<<" serving at port "<<port<<std::endl;
990 m_peer_id = PEER_ID_SERVER;
993 void Connection::connect(Address address)
995 dout_con<<getDesc()<<" connecting to "<<address.serializeString()
996 <<":"<<address.getPort()<<std::endl;
998 core::map<u16, Peer*>::Node *node = m_peers.find(PEER_ID_SERVER);
1000 throw ConnectionException("Already connected to a server");
1003 Peer *peer = new Peer(PEER_ID_SERVER, address);
1004 m_peers.insert(peer->id, peer);
1008 e.peerAdded(peer->id, peer->address);
1013 // Send a dummy packet to server with peer_id = PEER_ID_INEXISTENT
1014 m_peer_id = PEER_ID_INEXISTENT;
1015 SharedBuffer<u8> data(0);
1016 Send(PEER_ID_SERVER, 0, data, true);
1019 void Connection::disconnect()
1021 dout_con<<getDesc()<<" disconnecting"<<std::endl;
1023 // Create and send DISCO packet
1024 SharedBuffer<u8> data(2);
1025 writeU8(&data[0], TYPE_CONTROL);
1026 writeU8(&data[1], CONTROLTYPE_DISCO);
1029 core::map<u16, Peer*>::Iterator j;
1030 j = m_peers.getIterator();
1031 for(; j.atEnd() == false; j++)
1033 Peer *peer = j.getNode()->getValue();
1034 rawSendAsPacket(peer->id, 0, data, false);
1038 void Connection::sendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable)
1040 core::map<u16, Peer*>::Iterator j;
1041 j = m_peers.getIterator();
1042 for(; j.atEnd() == false; j++)
1044 Peer *peer = j.getNode()->getValue();
1045 send(peer->id, channelnum, data, reliable);
1049 void Connection::send(u16 peer_id, u8 channelnum,
1050 SharedBuffer<u8> data, bool reliable)
1052 dout_con<<getDesc()<<" sending to peer_id="<<peer_id<<std::endl;
1054 assert(channelnum < CHANNEL_COUNT);
1056 Peer *peer = getPeerNoEx(peer_id);
1059 Channel *channel = &(peer->channels[channelnum]);
1061 u32 chunksize_max = m_max_packet_size - BASE_HEADER_SIZE;
1063 chunksize_max -= RELIABLE_HEADER_SIZE;
1065 core::list<SharedBuffer<u8> > originals;
1066 originals = makeAutoSplitPacket(data, chunksize_max,
1067 channel->next_outgoing_split_seqnum);
1069 core::list<SharedBuffer<u8> >::Iterator i;
1070 i = originals.begin();
1071 for(; i != originals.end(); i++)
1073 SharedBuffer<u8> original = *i;
1075 sendAsPacket(peer_id, channelnum, original, reliable);
1079 void Connection::sendAsPacket(u16 peer_id, u8 channelnum,
1080 SharedBuffer<u8> data, bool reliable)
1082 OutgoingPacket packet(peer_id, channelnum, data, reliable);
1083 m_outgoing_queue.push_back(packet);
1086 void Connection::rawSendAsPacket(u16 peer_id, u8 channelnum,
1087 SharedBuffer<u8> data, bool reliable)
1089 Peer *peer = getPeerNoEx(peer_id);
1092 Channel *channel = &(peer->channels[channelnum]);
1096 u16 seqnum = channel->next_outgoing_seqnum;
1097 channel->next_outgoing_seqnum++;
1099 SharedBuffer<u8> reliable = makeReliablePacket(data, seqnum);
1101 // Add base headers and make a packet
1102 BufferedPacket p = makePacket(peer->address, reliable,
1103 m_protocol_id, m_peer_id, channelnum);
1106 // Buffer the packet
1107 channel->outgoing_reliables.insert(p);
1109 catch(AlreadyExistsException &e)
1111 PrintInfo(derr_con);
1112 derr_con<<"WARNING: Going to send a reliable packet "
1113 "seqnum="<<seqnum<<" that is already "
1114 "in outgoing buffer"<<std::endl;
1123 // Add base headers and make a packet
1124 BufferedPacket p = makePacket(peer->address, data,
1125 m_protocol_id, m_peer_id, channelnum);
1132 void Connection::rawSend(const BufferedPacket &packet)
1135 m_socket.Send(packet.address, *packet.data, packet.data.getSize());
1136 } catch(SendFailedException &e){
1137 derr_con<<"Connection::rawSend(): SendFailedException: "
1138 <<packet.address.serializeString()<<std::endl;
1142 Peer* Connection::getPeer(u16 peer_id)
1144 core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
1147 throw PeerNotFoundException("GetPeer: Peer not found (possible timeout)");
1151 assert(node->getValue()->id == peer_id);
1153 return node->getValue();
1156 Peer* Connection::getPeerNoEx(u16 peer_id)
1158 core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
1165 assert(node->getValue()->id == peer_id);
1167 return node->getValue();
1170 core::list<Peer*> Connection::getPeers()
1172 core::list<Peer*> list;
1173 core::map<u16, Peer*>::Iterator j;
1174 j = m_peers.getIterator();
1175 for(; j.atEnd() == false; j++)
1177 Peer *peer = j.getNode()->getValue();
1178 list.push_back(peer);
1183 bool Connection::getFromBuffers(u16 &peer_id, SharedBuffer<u8> &dst)
1185 core::map<u16, Peer*>::Iterator j;
1186 j = m_peers.getIterator();
1187 for(; j.atEnd() == false; j++)
1189 Peer *peer = j.getNode()->getValue();
1190 for(u16 i=0; i<CHANNEL_COUNT; i++)
1192 Channel *channel = &peer->channels[i];
1193 SharedBuffer<u8> resultdata;
1194 bool got = checkIncomingBuffers(channel, peer_id, resultdata);
1204 bool Connection::checkIncomingBuffers(Channel *channel, u16 &peer_id,
1205 SharedBuffer<u8> &dst)
1207 u16 firstseqnum = 0;
1208 // Clear old packets from start of buffer
1211 firstseqnum = channel->incoming_reliables.getFirstSeqnum();
1212 if(seqnum_higher(channel->next_incoming_seqnum, firstseqnum))
1213 channel->incoming_reliables.popFirst();
1217 // This happens if all packets are old
1218 }catch(con::NotFoundException)
1221 if(channel->incoming_reliables.empty() == false)
1223 if(firstseqnum == channel->next_incoming_seqnum)
1225 BufferedPacket p = channel->incoming_reliables.popFirst();
1227 peer_id = readPeerId(*p.data);
1228 u8 channelnum = readChannel(*p.data);
1229 u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
1232 dout_con<<"UNBUFFERING TYPE_RELIABLE"
1233 <<" seqnum="<<seqnum
1234 <<" peer_id="<<peer_id
1235 <<" channel="<<((int)channelnum&0xff)
1238 channel->next_incoming_seqnum++;
1240 u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
1241 // Get out the inside packet and re-process it
1242 SharedBuffer<u8> payload(p.data.getSize() - headers_size);
1243 memcpy(*payload, &p.data[headers_size], payload.getSize());
1245 dst = processPacket(channel, payload, peer_id, channelnum, true);
1252 SharedBuffer<u8> Connection::processPacket(Channel *channel,
1253 SharedBuffer<u8> packetdata, u16 peer_id,
1254 u8 channelnum, bool reliable)
1256 IndentationRaiser iraiser(&(m_indentation));
1258 if(packetdata.getSize() < 1)
1259 throw InvalidIncomingDataException("packetdata.getSize() < 1");
1261 u8 type = readU8(&packetdata[0]);
1263 if(type == TYPE_CONTROL)
1265 if(packetdata.getSize() < 2)
1266 throw InvalidIncomingDataException("packetdata.getSize() < 2");
1268 u8 controltype = readU8(&packetdata[1]);
1270 if(controltype == CONTROLTYPE_ACK)
1272 if(packetdata.getSize() < 4)
1273 throw InvalidIncomingDataException
1274 ("packetdata.getSize() < 4 (ACK header size)");
1276 u16 seqnum = readU16(&packetdata[2]);
1278 dout_con<<"Got CONTROLTYPE_ACK: channelnum="
1279 <<((int)channelnum&0xff)<<", peer_id="<<peer_id
1280 <<", seqnum="<<seqnum<<std::endl;
1283 BufferedPacket p = channel->outgoing_reliables.popSeqnum(seqnum);
1284 // Get round trip time
1285 float rtt = p.totaltime;
1287 // Let peer calculate stuff according to it
1288 // (avg_rtt and resend_timeout)
1289 Peer *peer = getPeer(peer_id);
1290 peer->reportRTT(rtt);
1292 //PrintInfo(dout_con);
1293 //dout_con<<"RTT = "<<rtt<<std::endl;
1295 /*dout_con<<"OUTGOING: ";
1297 channel->outgoing_reliables.print();
1298 dout_con<<std::endl;*/
1300 catch(NotFoundException &e){
1301 PrintInfo(derr_con);
1302 derr_con<<"WARNING: ACKed packet not "
1307 throw ProcessedSilentlyException("Got an ACK");
1309 else if(controltype == CONTROLTYPE_SET_PEER_ID)
1311 if(packetdata.getSize() < 4)
1312 throw InvalidIncomingDataException
1313 ("packetdata.getSize() < 4 (SET_PEER_ID header size)");
1314 u16 peer_id_new = readU16(&packetdata[2]);
1316 dout_con<<"Got new peer id: "<<peer_id_new<<"... "<<std::endl;
1318 if(GetPeerID() != PEER_ID_INEXISTENT)
1320 PrintInfo(derr_con);
1321 derr_con<<"WARNING: Not changing"
1322 " existing peer id."<<std::endl;
1326 dout_con<<"changing."<<std::endl;
1327 SetPeerID(peer_id_new);
1329 throw ProcessedSilentlyException("Got a SET_PEER_ID");
1331 else if(controltype == CONTROLTYPE_PING)
1333 // Just ignore it, the incoming data already reset
1334 // the timeout counter
1336 dout_con<<"PING"<<std::endl;
1337 throw ProcessedSilentlyException("Got a PING");
1339 else if(controltype == CONTROLTYPE_DISCO)
1341 // Just ignore it, the incoming data already reset
1342 // the timeout counter
1344 dout_con<<"DISCO: Removing peer "<<(peer_id)<<std::endl;
1346 if(deletePeer(peer_id, false) == false)
1348 PrintInfo(derr_con);
1349 derr_con<<"DISCO: Peer not found"<<std::endl;
1352 throw ProcessedSilentlyException("Got a DISCO");
1355 PrintInfo(derr_con);
1356 derr_con<<"INVALID TYPE_CONTROL: invalid controltype="
1357 <<((int)controltype&0xff)<<std::endl;
1358 throw InvalidIncomingDataException("Invalid control type");
1361 else if(type == TYPE_ORIGINAL)
1363 if(packetdata.getSize() < ORIGINAL_HEADER_SIZE)
1364 throw InvalidIncomingDataException
1365 ("packetdata.getSize() < ORIGINAL_HEADER_SIZE");
1367 dout_con<<"RETURNING TYPE_ORIGINAL to user"
1369 // Get the inside packet out and return it
1370 SharedBuffer<u8> payload(packetdata.getSize() - ORIGINAL_HEADER_SIZE);
1371 memcpy(*payload, &packetdata[ORIGINAL_HEADER_SIZE], payload.getSize());
1374 else if(type == TYPE_SPLIT)
1376 // We have to create a packet again for buffering
1377 // This isn't actually too bad an idea.
1378 BufferedPacket packet = makePacket(
1379 getPeer(peer_id)->address,
1384 // Buffer the packet
1385 SharedBuffer<u8> data = channel->incoming_splits.insert(packet, reliable);
1386 if(data.getSize() != 0)
1389 dout_con<<"RETURNING TYPE_SPLIT: Constructed full data, "
1390 <<"size="<<data.getSize()<<std::endl;
1394 dout_con<<"BUFFERED TYPE_SPLIT"<<std::endl;
1395 throw ProcessedSilentlyException("Buffered a split packet chunk");
1397 else if(type == TYPE_RELIABLE)
1399 // Recursive reliable packets not allowed
1400 assert(reliable == false);
1402 if(packetdata.getSize() < RELIABLE_HEADER_SIZE)
1403 throw InvalidIncomingDataException
1404 ("packetdata.getSize() < RELIABLE_HEADER_SIZE");
1406 u16 seqnum = readU16(&packetdata[1]);
1408 bool is_future_packet = seqnum_higher(seqnum, channel->next_incoming_seqnum);
1409 bool is_old_packet = seqnum_higher(channel->next_incoming_seqnum, seqnum);
1412 if(is_future_packet)
1413 dout_con<<"BUFFERING";
1414 else if(is_old_packet)
1418 dout_con<<" TYPE_RELIABLE seqnum="<<seqnum
1419 <<" next="<<channel->next_incoming_seqnum;
1420 dout_con<<" [sending CONTROLTYPE_ACK"
1421 " to peer_id="<<peer_id<<"]";
1422 dout_con<<std::endl;
1425 //assert(channel->incoming_reliables.size() < 100);
1427 // Send a CONTROLTYPE_ACK
1428 SharedBuffer<u8> reply(4);
1429 writeU8(&reply[0], TYPE_CONTROL);
1430 writeU8(&reply[1], CONTROLTYPE_ACK);
1431 writeU16(&reply[2], seqnum);
1432 rawSendAsPacket(peer_id, channelnum, reply, false);
1434 //if(seqnum_higher(seqnum, channel->next_incoming_seqnum))
1435 if(is_future_packet)
1438 dout_con<<"Buffering reliable packet (seqnum="
1439 <<seqnum<<")"<<std::endl;*/
1441 // This one comes later, buffer it.
1442 // Actually we have to make a packet to buffer one.
1443 // Well, we have all the ingredients, so just do it.
1444 BufferedPacket packet = makePacket(
1445 getPeer(peer_id)->address,
1451 channel->incoming_reliables.insert(packet);
1454 dout_con<<"INCOMING: ";
1455 channel->incoming_reliables.print();
1456 dout_con<<std::endl;*/
1458 catch(AlreadyExistsException &e)
1462 throw ProcessedSilentlyException("Buffered future reliable packet");
1464 //else if(seqnum_higher(channel->next_incoming_seqnum, seqnum))
1465 else if(is_old_packet)
1467 // An old packet, dump it
1468 throw InvalidIncomingDataException("Got an old reliable packet");
1471 channel->next_incoming_seqnum++;
1473 // Get out the inside packet and re-process it
1474 SharedBuffer<u8> payload(packetdata.getSize() - RELIABLE_HEADER_SIZE);
1475 memcpy(*payload, &packetdata[RELIABLE_HEADER_SIZE], payload.getSize());
1477 return processPacket(channel, payload, peer_id, channelnum, true);
1481 PrintInfo(derr_con);
1482 derr_con<<"Got invalid type="<<((int)type&0xff)<<std::endl;
1483 throw InvalidIncomingDataException("Invalid packet type");
1486 // We should never get here.
1487 // If you get here, add an exception or a return to some of the
1488 // above conditionals.
1490 throw BaseException("Error in Channel::ProcessPacket()");
1493 bool Connection::deletePeer(u16 peer_id, bool timeout)
1495 if(m_peers.find(peer_id) == NULL)
1498 Peer *peer = m_peers[peer_id];
1502 e.peerRemoved(peer_id, timeout, peer->address);
1505 delete m_peers[peer_id];
1506 m_peers.remove(peer_id);
1512 ConnectionEvent Connection::getEvent()
1514 if(m_event_queue.size() == 0){
1516 e.type = CONNEVENT_NONE;
1519 return m_event_queue.pop_front();
1522 ConnectionEvent Connection::waitEvent(u32 timeout_ms)
1525 return m_event_queue.pop_front(timeout_ms);
1526 } catch(ItemNotFoundException &ex){
1528 e.type = CONNEVENT_NONE;
1533 void Connection::putCommand(ConnectionCommand &c)
1535 m_command_queue.push_back(c);
1538 void Connection::Serve(unsigned short port)
1540 ConnectionCommand c;
1545 void Connection::Connect(Address address)
1547 ConnectionCommand c;
1552 bool Connection::Connected()
1554 JMutexAutoLock peerlock(m_peers_mutex);
1556 if(m_peers.size() != 1)
1559 core::map<u16, Peer*>::Node *node = m_peers.find(PEER_ID_SERVER);
1563 if(m_peer_id == PEER_ID_INEXISTENT)
1569 void Connection::Disconnect()
1571 ConnectionCommand c;
1576 u32 Connection::Receive(u16 &peer_id, SharedBuffer<u8> &data)
1579 ConnectionEvent e = waitEvent(m_bc_receive_timeout);
1580 if(e.type != CONNEVENT_NONE)
1581 dout_con<<getDesc()<<": Receive: got event: "
1582 <<e.describe()<<std::endl;
1584 case CONNEVENT_NONE:
1585 throw NoIncomingDataException("No incoming data");
1586 case CONNEVENT_DATA_RECEIVED:
1587 peer_id = e.peer_id;
1588 data = SharedBuffer<u8>(e.data);
1589 return e.data.getSize();
1590 case CONNEVENT_PEER_ADDED: {
1591 Peer tmp(e.peer_id, e.address);
1592 if(m_bc_peerhandler)
1593 m_bc_peerhandler->peerAdded(&tmp);
1595 case CONNEVENT_PEER_REMOVED: {
1596 Peer tmp(e.peer_id, e.address);
1597 if(m_bc_peerhandler)
1598 m_bc_peerhandler->deletingPeer(&tmp, e.timeout);
1602 throw NoIncomingDataException("No incoming data");
1605 void Connection::SendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable)
1607 assert(channelnum < CHANNEL_COUNT);
1609 ConnectionCommand c;
1610 c.sendToAll(channelnum, data, reliable);
1614 void Connection::Send(u16 peer_id, u8 channelnum,
1615 SharedBuffer<u8> data, bool reliable)
1617 assert(channelnum < CHANNEL_COUNT);
1619 ConnectionCommand c;
1620 c.send(peer_id, channelnum, data, reliable);
1624 void Connection::RunTimeouts(float dtime)
1629 Address Connection::GetPeerAddress(u16 peer_id)
1631 JMutexAutoLock peerlock(m_peers_mutex);
1632 return getPeer(peer_id)->address;
1635 float Connection::GetPeerAvgRTT(u16 peer_id)
1637 JMutexAutoLock peerlock(m_peers_mutex);
1638 return getPeer(peer_id)->avg_rtt;
1641 void Connection::DeletePeer(u16 peer_id)
1643 ConnectionCommand c;
1644 c.deletePeer(peer_id);
1648 void Connection::PrintInfo(std::ostream &out)
1650 out<<getDesc()<<": ";
1653 void Connection::PrintInfo()
1655 PrintInfo(dout_con);
1658 std::string Connection::getDesc()
1660 return std::string("con(")+itos(m_socket.GetHandle())+"/"+itos(m_peer_id)+")";