3 Copyright (C) 2010 celeron55, Perttu Ahola <celeron55@gmail.com>
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License along
16 with this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 #include "connection.h"
22 #include "serialization.h"
29 BufferedPacket makePacket(Address &address, u8 *data, u32 datasize,
30 u32 protocol_id, u16 sender_peer_id, u8 channel)
32 u32 packet_size = datasize + BASE_HEADER_SIZE;
33 BufferedPacket p(packet_size);
36 writeU32(&p.data[0], protocol_id);
37 writeU16(&p.data[4], sender_peer_id);
38 writeU8(&p.data[6], channel);
40 memcpy(&p.data[BASE_HEADER_SIZE], data, datasize);
45 BufferedPacket makePacket(Address &address, SharedBuffer<u8> &data,
46 u32 protocol_id, u16 sender_peer_id, u8 channel)
48 return makePacket(address, *data, data.getSize(),
49 protocol_id, sender_peer_id, channel);
52 SharedBuffer<u8> makeOriginalPacket(
53 SharedBuffer<u8> data)
56 u32 packet_size = data.getSize() + header_size;
57 SharedBuffer<u8> b(packet_size);
59 writeU8(&b[0], TYPE_ORIGINAL);
61 memcpy(&b[header_size], *data, data.getSize());
66 core::list<SharedBuffer<u8> > makeSplitPacket(
67 SharedBuffer<u8> data,
71 // Chunk packets, containing the TYPE_SPLIT header
72 core::list<SharedBuffer<u8> > chunks;
74 u32 chunk_header_size = 7;
75 u32 maximum_data_size = chunksize_max - chunk_header_size;
80 end = start + maximum_data_size - 1;
81 if(end > data.getSize() - 1)
82 end = data.getSize() - 1;
84 u32 payload_size = end - start + 1;
85 u32 packet_size = chunk_header_size + payload_size;
87 SharedBuffer<u8> chunk(packet_size);
89 writeU8(&chunk[0], TYPE_SPLIT);
90 writeU16(&chunk[1], seqnum);
91 // [3] u16 chunk_count is written at next stage
92 writeU16(&chunk[5], chunk_num);
93 memcpy(&chunk[chunk_header_size], &data[start], payload_size);
95 chunks.push_back(chunk);
100 while(end != data.getSize() - 1);
102 u16 chunk_count = chunks.getSize();
104 core::list<SharedBuffer<u8> >::Iterator i = chunks.begin();
105 for(; i != chunks.end(); i++)
108 writeU16(&((*i)[3]), chunk_count);
114 core::list<SharedBuffer<u8> > makeAutoSplitPacket(
115 SharedBuffer<u8> data,
119 u32 original_header_size = 1;
120 core::list<SharedBuffer<u8> > list;
121 if(data.getSize() + original_header_size > chunksize_max)
123 list = makeSplitPacket(data, chunksize_max, split_seqnum);
129 list.push_back(makeOriginalPacket(data));
134 SharedBuffer<u8> makeReliablePacket(
135 SharedBuffer<u8> data,
138 /*dstream<<"BEGIN SharedBuffer<u8> makeReliablePacket()"<<std::endl;
139 dstream<<"data.getSize()="<<data.getSize()<<", data[0]="
140 <<((unsigned int)data[0]&0xff)<<std::endl;*/
142 u32 packet_size = data.getSize() + header_size;
143 SharedBuffer<u8> b(packet_size);
145 writeU8(&b[0], TYPE_RELIABLE);
146 writeU16(&b[1], seqnum);
148 memcpy(&b[header_size], *data, data.getSize());
150 /*dstream<<"data.getSize()="<<data.getSize()<<", data[0]="
151 <<((unsigned int)data[0]&0xff)<<std::endl;*/
152 //dstream<<"END SharedBuffer<u8> makeReliablePacket()"<<std::endl;
160 void ReliablePacketBuffer::print()
162 core::list<BufferedPacket>::Iterator i;
164 for(; i != m_list.end(); i++)
166 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
170 bool ReliablePacketBuffer::empty()
172 return m_list.empty();
174 u32 ReliablePacketBuffer::size()
176 return m_list.getSize();
178 RPBSearchResult ReliablePacketBuffer::findPacket(u16 seqnum)
180 core::list<BufferedPacket>::Iterator i;
182 for(; i != m_list.end(); i++)
184 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
185 /*dout_con<<"findPacket(): finding seqnum="<<seqnum
186 <<", comparing to s="<<s<<std::endl;*/
192 RPBSearchResult ReliablePacketBuffer::notFound()
196 u16 ReliablePacketBuffer::getFirstSeqnum()
199 throw NotFoundException("Buffer is empty");
200 BufferedPacket p = *m_list.begin();
201 return readU16(&p.data[BASE_HEADER_SIZE+1]);
203 BufferedPacket ReliablePacketBuffer::popFirst()
206 throw NotFoundException("Buffer is empty");
207 BufferedPacket p = *m_list.begin();
208 core::list<BufferedPacket>::Iterator i = m_list.begin();
212 BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum)
214 RPBSearchResult r = findPacket(seqnum);
216 dout_con<<"Not found"<<std::endl;
217 throw NotFoundException("seqnum not found in buffer");
219 BufferedPacket p = *r;
223 void ReliablePacketBuffer::insert(BufferedPacket &p)
225 assert(p.data.getSize() >= BASE_HEADER_SIZE+3);
226 u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
227 assert(type == TYPE_RELIABLE);
228 u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
230 // Find the right place for the packet and insert it there
232 // If list is empty, just add it
239 // Otherwise find the right place
240 core::list<BufferedPacket>::Iterator i;
242 // Find the first packet in the list which has a higher seqnum
243 for(; i != m_list.end(); i++){
244 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
246 throw AlreadyExistsException("Same seqnum in list");
248 if(seqnum_higher(s, seqnum)){
252 // If we're at the end of the list, add the packet to the
254 if(i == m_list.end())
261 m_list.insert_before(i, p);
264 void ReliablePacketBuffer::incrementTimeouts(float dtime)
266 core::list<BufferedPacket>::Iterator i;
268 for(; i != m_list.end(); i++){
270 i->totaltime += dtime;
274 void ReliablePacketBuffer::resetTimedOuts(float timeout)
276 core::list<BufferedPacket>::Iterator i;
278 for(; i != m_list.end(); i++){
279 if(i->time >= timeout)
284 bool ReliablePacketBuffer::anyTotaltimeReached(float timeout)
286 core::list<BufferedPacket>::Iterator i;
288 for(; i != m_list.end(); i++){
289 if(i->totaltime >= timeout)
295 core::list<BufferedPacket> ReliablePacketBuffer::getTimedOuts(float timeout)
297 core::list<BufferedPacket> timed_outs;
298 core::list<BufferedPacket>::Iterator i;
300 for(; i != m_list.end(); i++)
302 if(i->time >= timeout)
303 timed_outs.push_back(*i);
312 IncomingSplitBuffer::~IncomingSplitBuffer()
314 core::map<u16, IncomingSplitPacket*>::Iterator i;
315 i = m_buf.getIterator();
316 for(; i.atEnd() == false; i++)
318 delete i.getNode()->getValue();
322 This will throw a GotSplitPacketException when a full
323 split packet is constructed.
325 SharedBuffer<u8> IncomingSplitBuffer::insert(BufferedPacket &p, bool reliable)
327 u32 headersize = BASE_HEADER_SIZE + 7;
328 assert(p.data.getSize() >= headersize);
329 u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
330 assert(type == TYPE_SPLIT);
331 u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
332 u16 chunk_count = readU16(&p.data[BASE_HEADER_SIZE+3]);
333 u16 chunk_num = readU16(&p.data[BASE_HEADER_SIZE+5]);
335 // Add if doesn't exist
336 if(m_buf.find(seqnum) == NULL)
338 IncomingSplitPacket *sp = new IncomingSplitPacket();
339 sp->chunk_count = chunk_count;
340 sp->reliable = reliable;
344 IncomingSplitPacket *sp = m_buf[seqnum];
346 // TODO: These errors should be thrown or something? Dunno.
347 if(chunk_count != sp->chunk_count)
348 derr_con<<"Connection: WARNING: chunk_count="<<chunk_count
349 <<" != sp->chunk_count="<<sp->chunk_count
351 if(reliable != sp->reliable)
352 derr_con<<"Connection: WARNING: reliable="<<reliable
353 <<" != sp->reliable="<<sp->reliable
356 // If chunk already exists, cancel
357 if(sp->chunks.find(chunk_num) != NULL)
358 throw AlreadyExistsException("Chunk already in buffer");
360 // Cut chunk data out of packet
361 u32 chunkdatasize = p.data.getSize() - headersize;
362 SharedBuffer<u8> chunkdata(chunkdatasize);
363 memcpy(*chunkdata, &(p.data[headersize]), chunkdatasize);
365 // Set chunk data in buffer
366 sp->chunks[chunk_num] = chunkdata;
368 // If not all chunks are received, return empty buffer
369 if(sp->allReceived() == false)
370 return SharedBuffer<u8>();
372 // Calculate total size
374 core::map<u16, SharedBuffer<u8> >::Iterator i;
375 i = sp->chunks.getIterator();
376 for(; i.atEnd() == false; i++)
378 totalsize += i.getNode()->getValue().getSize();
381 SharedBuffer<u8> fulldata(totalsize);
383 // Copy chunks to data buffer
385 for(u32 chunk_i=0; chunk_i<sp->chunk_count;
388 SharedBuffer<u8> buf = sp->chunks[chunk_i];
389 u16 chunkdatasize = buf.getSize();
390 memcpy(&fulldata[start], *buf, chunkdatasize);
391 start += chunkdatasize;;
394 // Remove sp from buffer
395 m_buf.remove(seqnum);
400 void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout)
402 core::list<u16> remove_queue;
403 core::map<u16, IncomingSplitPacket*>::Iterator i;
404 i = m_buf.getIterator();
405 for(; i.atEnd() == false; i++)
407 IncomingSplitPacket *p = i.getNode()->getValue();
408 // Reliable ones are not removed by timeout
409 if(p->reliable == true)
412 if(p->time >= timeout)
413 remove_queue.push_back(i.getNode()->getKey());
415 core::list<u16>::Iterator j;
416 j = remove_queue.begin();
417 for(; j != remove_queue.end(); j++)
419 dout_con<<"NOTE: Removing timed out unreliable split packet"
432 next_outgoing_seqnum = SEQNUM_INITIAL;
433 next_incoming_seqnum = SEQNUM_INITIAL;
434 next_outgoing_split_seqnum = SEQNUM_INITIAL;
444 Peer::Peer(u16 a_id, Address a_address):
447 timeout_counter(0.0),
451 has_sent_with_id(false),
453 m_max_packets_per_second(10),
462 void Peer::reportRTT(float rtt)
466 if(m_max_packets_per_second < 400)
467 m_max_packets_per_second += 10;
468 } else if(rtt < 0.2){
469 if(m_max_packets_per_second < 100)
470 m_max_packets_per_second += 2;
472 m_max_packets_per_second *= 0.8;
473 if(m_max_packets_per_second < 10)
474 m_max_packets_per_second = 10;
480 else if(avg_rtt < 0.0)
483 avg_rtt = rtt * 0.1 + avg_rtt * 0.9;
485 // Calculate resend_timeout
487 /*int reliable_count = 0;
488 for(int i=0; i<CHANNEL_COUNT; i++)
490 reliable_count += channels[i].outgoing_reliables.size();
492 float timeout = avg_rtt * RESEND_TIMEOUT_FACTOR
493 * ((float)reliable_count * 1);*/
495 float timeout = avg_rtt * RESEND_TIMEOUT_FACTOR;
496 if(timeout < RESEND_TIMEOUT_MIN)
497 timeout = RESEND_TIMEOUT_MIN;
498 if(timeout > RESEND_TIMEOUT_MAX)
499 timeout = RESEND_TIMEOUT_MAX;
500 resend_timeout = timeout;
507 Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout):
508 m_protocol_id(protocol_id),
509 m_max_packet_size(max_packet_size),
512 m_bc_peerhandler(NULL),
513 m_bc_receive_timeout(0),
516 m_socket.setTimeoutMs(5);
521 Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout,
522 PeerHandler *peerhandler):
523 m_protocol_id(protocol_id),
524 m_max_packet_size(max_packet_size),
527 m_bc_peerhandler(peerhandler),
528 m_bc_receive_timeout(0),
531 m_socket.setTimeoutMs(5);
537 Connection::~Connection()
544 void * Connection::Thread()
547 log_register_thread("Connection");
549 dout_con<<"Connection thread started"<<std::endl;
551 u32 curtime = porting::getTimeMs();
552 u32 lasttime = curtime;
556 BEGIN_DEBUG_EXCEPTION_HANDLER
559 curtime = porting::getTimeMs();
560 float dtime = (float)(curtime - lasttime) / 1000.;
568 while(m_command_queue.size() != 0){
569 ConnectionCommand c = m_command_queue.pop_front();
577 END_DEBUG_EXCEPTION_HANDLER(derr_con);
583 void Connection::putEvent(ConnectionEvent &e)
585 assert(e.type != CONNEVENT_NONE);
586 m_event_queue.push_back(e);
589 void Connection::processCommand(ConnectionCommand &c)
593 dout_con<<getDesc()<<" processing CONNCMD_NONE"<<std::endl;
596 dout_con<<getDesc()<<" processing CONNCMD_SERVE port="
600 case CONNCMD_CONNECT:
601 dout_con<<getDesc()<<" processing CONNCMD_CONNECT"<<std::endl;
604 case CONNCMD_DISCONNECT:
605 dout_con<<getDesc()<<" processing CONNCMD_DISCONNECT"<<std::endl;
609 dout_con<<getDesc()<<" processing CONNCMD_SEND"<<std::endl;
610 send(c.peer_id, c.channelnum, c.data, c.reliable);
612 case CONNCMD_SEND_TO_ALL:
613 dout_con<<getDesc()<<" processing CONNCMD_SEND_TO_ALL"<<std::endl;
614 sendToAll(c.channelnum, c.data, c.reliable);
616 case CONNCMD_DELETE_PEER:
617 dout_con<<getDesc()<<" processing CONNCMD_DELETE_PEER"<<std::endl;
618 deletePeer(c.peer_id, false);
623 void Connection::send(float dtime)
625 for(core::map<u16, Peer*>::Iterator
626 j = m_peers.getIterator();
627 j.atEnd() == false; j++)
629 Peer *peer = j.getNode()->getValue();
630 peer->m_sendtime_accu += dtime;
631 peer->m_num_sent = 0;
632 peer->m_max_num_sent = peer->m_sendtime_accu *
633 peer->m_max_packets_per_second;
635 Queue<OutgoingPacket> postponed_packets;
636 while(m_outgoing_queue.size() != 0){
637 OutgoingPacket packet = m_outgoing_queue.pop_front();
638 Peer *peer = getPeerNoEx(packet.peer_id);
641 if(peer->channels[packet.channelnum].outgoing_reliables.size() >= 5){
642 postponed_packets.push_back(packet);
643 } else if(peer->m_num_sent < peer->m_max_num_sent){
644 rawSendAsPacket(packet.peer_id, packet.channelnum,
645 packet.data, packet.reliable);
648 postponed_packets.push_back(packet);
651 while(postponed_packets.size() != 0){
652 m_outgoing_queue.push_back(postponed_packets.pop_front());
654 for(core::map<u16, Peer*>::Iterator
655 j = m_peers.getIterator();
656 j.atEnd() == false; j++)
658 Peer *peer = j.getNode()->getValue();
659 peer->m_sendtime_accu -= (float)peer->m_num_sent /
660 peer->m_max_packets_per_second;
661 if(peer->m_sendtime_accu > 10. / peer->m_max_packets_per_second)
662 peer->m_sendtime_accu = 10. / peer->m_max_packets_per_second;
666 // Receive packets from the network and buffers and create ConnectionEvents
667 void Connection::receive()
669 u32 datasize = m_max_packet_size * 2; // Double it just to be safe
670 // TODO: We can not know how many layers of header there are.
671 // For now, just assume there are no other than the base headers.
672 u32 packet_maxsize = datasize + BASE_HEADER_SIZE;
673 SharedBuffer<u8> packetdata(packet_maxsize);
675 bool single_wait_done = false;
680 /* Check if some buffer has relevant data */
683 SharedBuffer<u8> resultdata;
684 bool got = getFromBuffers(peer_id, resultdata);
687 e.dataReceived(peer_id, resultdata);
693 if(single_wait_done){
694 if(m_socket.WaitData(0) == false)
698 single_wait_done = true;
701 s32 received_size = m_socket.Receive(sender, *packetdata, packet_maxsize);
703 if(received_size < 0)
705 if(received_size < BASE_HEADER_SIZE)
707 if(readU32(&packetdata[0]) != m_protocol_id)
710 u16 peer_id = readPeerId(*packetdata);
711 u8 channelnum = readChannel(*packetdata);
712 if(channelnum > CHANNEL_COUNT-1){
714 derr_con<<"Receive(): Invalid channel "<<channelnum<<std::endl;
715 throw InvalidIncomingDataException("Channel doesn't exist");
718 if(peer_id == PEER_ID_INEXISTENT)
721 Somebody is trying to send stuff to us with no peer id.
723 Check if the same address and port was added to our peer
725 Allow only entries that have has_sent_with_id==false.
728 core::map<u16, Peer*>::Iterator j;
729 j = m_peers.getIterator();
730 for(; j.atEnd() == false; j++)
732 Peer *peer = j.getNode()->getValue();
733 if(peer->has_sent_with_id)
735 if(peer->address == sender)
740 If no peer was found with the same address and port,
741 we shall assume it is a new peer and create an entry.
745 // Pass on to adding the peer
747 // Else: A peer was found.
750 Peer *peer = j.getNode()->getValue();
753 derr_con<<"WARNING: Assuming unknown peer to be "
754 <<"peer_id="<<peer_id<<std::endl;
759 The peer was not found in our lists. Add it.
761 if(peer_id == PEER_ID_INEXISTENT)
763 // Somebody wants to make a new connection
765 // Get a unique peer id (2 or higher)
768 Find an unused peer id
770 bool out_of_ids = false;
774 if(m_peers.find(peer_id_new) == NULL)
776 // Check for overflow
777 if(peer_id_new == 65535){
784 errorstream<<getDesc()<<" ran out of peer ids"<<std::endl;
789 dout_con<<"Receive(): Got a packet with peer_id=PEER_ID_INEXISTENT,"
790 " giving peer_id="<<peer_id_new<<std::endl;
793 Peer *peer = new Peer(peer_id_new, sender);
794 m_peers.insert(peer->id, peer);
796 // Create peer addition event
798 e.peerAdded(peer_id_new, sender);
801 // Create CONTROL packet to tell the peer id to the new peer.
802 SharedBuffer<u8> reply(4);
803 writeU8(&reply[0], TYPE_CONTROL);
804 writeU8(&reply[1], CONTROLTYPE_SET_PEER_ID);
805 writeU16(&reply[2], peer_id_new);
806 sendAsPacket(peer_id_new, 0, reply, true);
808 // We're now talking to a valid peer_id
809 peer_id = peer_id_new;
811 // Go on and process whatever it sent
814 core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
819 // This means that the peer id of the sender is not PEER_ID_INEXISTENT
820 // and it is invalid.
822 derr_con<<"Receive(): Peer not found"<<std::endl;
823 throw InvalidIncomingDataException("Peer not found (possible timeout)");
826 Peer *peer = node->getValue();
828 // Validate peer address
829 if(peer->address != sender)
832 derr_con<<"Peer "<<peer_id<<" sending from different address."
833 " Ignoring."<<std::endl;
837 peer->timeout_counter = 0.0;
839 Channel *channel = &(peer->channels[channelnum]);
841 // Throw the received packet to channel->processPacket()
843 // Make a new SharedBuffer from the data without the base headers
844 SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE);
845 memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE],
846 strippeddata.getSize());
849 // Process it (the result is some data with no headers made by us)
850 SharedBuffer<u8> resultdata = processPacket
851 (channel, strippeddata, peer_id, channelnum, false);
854 dout_con<<"ProcessPacket returned data of size "
855 <<resultdata.getSize()<<std::endl;
858 e.dataReceived(peer_id, resultdata);
861 }catch(ProcessedSilentlyException &e){
863 }catch(InvalidIncomingDataException &e){
865 catch(ProcessedSilentlyException &e){
870 void Connection::runTimeouts(float dtime)
872 core::list<u16> timeouted_peers;
873 core::map<u16, Peer*>::Iterator j;
874 j = m_peers.getIterator();
875 for(; j.atEnd() == false; j++)
877 Peer *peer = j.getNode()->getValue();
882 peer->timeout_counter += dtime;
883 if(peer->timeout_counter > m_timeout)
886 derr_con<<"RunTimeouts(): Peer "<<peer->id
888 <<" (source=peer->timeout_counter)"
890 // Add peer to the list
891 timeouted_peers.push_back(peer->id);
892 // Don't bother going through the buffers of this one
896 float resend_timeout = peer->resend_timeout;
897 for(u16 i=0; i<CHANNEL_COUNT; i++)
899 core::list<BufferedPacket> timed_outs;
900 core::list<BufferedPacket>::Iterator j;
902 Channel *channel = &peer->channels[i];
904 // Remove timed out incomplete unreliable split packets
905 channel->incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout);
907 // Increment reliable packet times
908 channel->outgoing_reliables.incrementTimeouts(dtime);
910 // Check reliable packet total times, remove peer if
912 if(channel->outgoing_reliables.anyTotaltimeReached(m_timeout))
915 derr_con<<"RunTimeouts(): Peer "<<peer->id
917 <<" (source=reliable packet totaltime)"
919 // Add peer to the to-be-removed list
920 timeouted_peers.push_back(peer->id);
924 // Re-send timed out outgoing reliables
926 timed_outs = channel->
927 outgoing_reliables.getTimedOuts(resend_timeout);
929 channel->outgoing_reliables.resetTimedOuts(resend_timeout);
931 j = timed_outs.begin();
932 for(; j != timed_outs.end(); j++)
934 u16 peer_id = readPeerId(*(j->data));
935 u8 channel = readChannel(*(j->data));
936 u16 seqnum = readU16(&(j->data[BASE_HEADER_SIZE+1]));
939 derr_con<<"RE-SENDING timed-out RELIABLE to ";
940 j->address.print(&derr_con);
941 derr_con<<"(t/o="<<resend_timeout<<"): "
942 <<"from_peer_id="<<peer_id
943 <<", channel="<<((int)channel&0xff)
944 <<", seqnum="<<seqnum
949 // Enlarge avg_rtt and resend_timeout:
950 // The rtt will be at least the timeout.
951 // NOTE: This won't affect the timeout of the next
952 // checked channel because it was cached.
953 peer->reportRTT(resend_timeout);
960 peer->ping_timer += dtime;
961 if(peer->ping_timer >= 5.0)
963 // Create and send PING packet
964 SharedBuffer<u8> data(2);
965 writeU8(&data[0], TYPE_CONTROL);
966 writeU8(&data[1], CONTROLTYPE_PING);
967 rawSendAsPacket(peer->id, 0, data, true);
969 peer->ping_timer = 0.0;
976 // Remove timed out peers
977 core::list<u16>::Iterator i = timeouted_peers.begin();
978 for(; i != timeouted_peers.end(); i++)
981 derr_con<<"RunTimeouts(): Removing peer "<<(*i)<<std::endl;
982 deletePeer(*i, true);
986 void Connection::serve(u16 port)
988 dout_con<<getDesc()<<" serving at port "<<port<<std::endl;
991 m_peer_id = PEER_ID_SERVER;
993 catch(SocketException &e){
1001 void Connection::connect(Address address)
1003 dout_con<<getDesc()<<" connecting to "<<address.serializeString()
1004 <<":"<<address.getPort()<<std::endl;
1006 core::map<u16, Peer*>::Node *node = m_peers.find(PEER_ID_SERVER);
1008 throw ConnectionException("Already connected to a server");
1011 Peer *peer = new Peer(PEER_ID_SERVER, address);
1012 m_peers.insert(peer->id, peer);
1016 e.peerAdded(peer->id, peer->address);
1021 // Send a dummy packet to server with peer_id = PEER_ID_INEXISTENT
1022 m_peer_id = PEER_ID_INEXISTENT;
1023 SharedBuffer<u8> data(0);
1024 Send(PEER_ID_SERVER, 0, data, true);
1027 void Connection::disconnect()
1029 dout_con<<getDesc()<<" disconnecting"<<std::endl;
1031 // Create and send DISCO packet
1032 SharedBuffer<u8> data(2);
1033 writeU8(&data[0], TYPE_CONTROL);
1034 writeU8(&data[1], CONTROLTYPE_DISCO);
1037 core::map<u16, Peer*>::Iterator j;
1038 j = m_peers.getIterator();
1039 for(; j.atEnd() == false; j++)
1041 Peer *peer = j.getNode()->getValue();
1042 rawSendAsPacket(peer->id, 0, data, false);
1046 void Connection::sendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable)
1048 core::map<u16, Peer*>::Iterator j;
1049 j = m_peers.getIterator();
1050 for(; j.atEnd() == false; j++)
1052 Peer *peer = j.getNode()->getValue();
1053 send(peer->id, channelnum, data, reliable);
1057 void Connection::send(u16 peer_id, u8 channelnum,
1058 SharedBuffer<u8> data, bool reliable)
1060 dout_con<<getDesc()<<" sending to peer_id="<<peer_id<<std::endl;
1062 assert(channelnum < CHANNEL_COUNT);
1064 Peer *peer = getPeerNoEx(peer_id);
1067 Channel *channel = &(peer->channels[channelnum]);
1069 u32 chunksize_max = m_max_packet_size - BASE_HEADER_SIZE;
1071 chunksize_max -= RELIABLE_HEADER_SIZE;
1073 core::list<SharedBuffer<u8> > originals;
1074 originals = makeAutoSplitPacket(data, chunksize_max,
1075 channel->next_outgoing_split_seqnum);
1077 core::list<SharedBuffer<u8> >::Iterator i;
1078 i = originals.begin();
1079 for(; i != originals.end(); i++)
1081 SharedBuffer<u8> original = *i;
1083 sendAsPacket(peer_id, channelnum, original, reliable);
1087 void Connection::sendAsPacket(u16 peer_id, u8 channelnum,
1088 SharedBuffer<u8> data, bool reliable)
1090 OutgoingPacket packet(peer_id, channelnum, data, reliable);
1091 m_outgoing_queue.push_back(packet);
1094 void Connection::rawSendAsPacket(u16 peer_id, u8 channelnum,
1095 SharedBuffer<u8> data, bool reliable)
1097 Peer *peer = getPeerNoEx(peer_id);
1100 Channel *channel = &(peer->channels[channelnum]);
1104 u16 seqnum = channel->next_outgoing_seqnum;
1105 channel->next_outgoing_seqnum++;
1107 SharedBuffer<u8> reliable = makeReliablePacket(data, seqnum);
1109 // Add base headers and make a packet
1110 BufferedPacket p = makePacket(peer->address, reliable,
1111 m_protocol_id, m_peer_id, channelnum);
1114 // Buffer the packet
1115 channel->outgoing_reliables.insert(p);
1117 catch(AlreadyExistsException &e)
1119 PrintInfo(derr_con);
1120 derr_con<<"WARNING: Going to send a reliable packet "
1121 "seqnum="<<seqnum<<" that is already "
1122 "in outgoing buffer"<<std::endl;
1131 // Add base headers and make a packet
1132 BufferedPacket p = makePacket(peer->address, data,
1133 m_protocol_id, m_peer_id, channelnum);
1140 void Connection::rawSend(const BufferedPacket &packet)
1143 m_socket.Send(packet.address, *packet.data, packet.data.getSize());
1144 } catch(SendFailedException &e){
1145 derr_con<<"Connection::rawSend(): SendFailedException: "
1146 <<packet.address.serializeString()<<std::endl;
1150 Peer* Connection::getPeer(u16 peer_id)
1152 core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
1155 throw PeerNotFoundException("GetPeer: Peer not found (possible timeout)");
1159 assert(node->getValue()->id == peer_id);
1161 return node->getValue();
1164 Peer* Connection::getPeerNoEx(u16 peer_id)
1166 core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
1173 assert(node->getValue()->id == peer_id);
1175 return node->getValue();
1178 core::list<Peer*> Connection::getPeers()
1180 core::list<Peer*> list;
1181 core::map<u16, Peer*>::Iterator j;
1182 j = m_peers.getIterator();
1183 for(; j.atEnd() == false; j++)
1185 Peer *peer = j.getNode()->getValue();
1186 list.push_back(peer);
1191 bool Connection::getFromBuffers(u16 &peer_id, SharedBuffer<u8> &dst)
1193 core::map<u16, Peer*>::Iterator j;
1194 j = m_peers.getIterator();
1195 for(; j.atEnd() == false; j++)
1197 Peer *peer = j.getNode()->getValue();
1198 for(u16 i=0; i<CHANNEL_COUNT; i++)
1200 Channel *channel = &peer->channels[i];
1201 SharedBuffer<u8> resultdata;
1202 bool got = checkIncomingBuffers(channel, peer_id, resultdata);
1212 bool Connection::checkIncomingBuffers(Channel *channel, u16 &peer_id,
1213 SharedBuffer<u8> &dst)
1215 u16 firstseqnum = 0;
1216 // Clear old packets from start of buffer
1219 firstseqnum = channel->incoming_reliables.getFirstSeqnum();
1220 if(seqnum_higher(channel->next_incoming_seqnum, firstseqnum))
1221 channel->incoming_reliables.popFirst();
1225 // This happens if all packets are old
1226 }catch(con::NotFoundException)
1229 if(channel->incoming_reliables.empty() == false)
1231 if(firstseqnum == channel->next_incoming_seqnum)
1233 BufferedPacket p = channel->incoming_reliables.popFirst();
1235 peer_id = readPeerId(*p.data);
1236 u8 channelnum = readChannel(*p.data);
1237 u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
1240 dout_con<<"UNBUFFERING TYPE_RELIABLE"
1241 <<" seqnum="<<seqnum
1242 <<" peer_id="<<peer_id
1243 <<" channel="<<((int)channelnum&0xff)
1246 channel->next_incoming_seqnum++;
1248 u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
1249 // Get out the inside packet and re-process it
1250 SharedBuffer<u8> payload(p.data.getSize() - headers_size);
1251 memcpy(*payload, &p.data[headers_size], payload.getSize());
1253 dst = processPacket(channel, payload, peer_id, channelnum, true);
1260 SharedBuffer<u8> Connection::processPacket(Channel *channel,
1261 SharedBuffer<u8> packetdata, u16 peer_id,
1262 u8 channelnum, bool reliable)
1264 IndentationRaiser iraiser(&(m_indentation));
1266 if(packetdata.getSize() < 1)
1267 throw InvalidIncomingDataException("packetdata.getSize() < 1");
1269 u8 type = readU8(&packetdata[0]);
1271 if(type == TYPE_CONTROL)
1273 if(packetdata.getSize() < 2)
1274 throw InvalidIncomingDataException("packetdata.getSize() < 2");
1276 u8 controltype = readU8(&packetdata[1]);
1278 if(controltype == CONTROLTYPE_ACK)
1280 if(packetdata.getSize() < 4)
1281 throw InvalidIncomingDataException
1282 ("packetdata.getSize() < 4 (ACK header size)");
1284 u16 seqnum = readU16(&packetdata[2]);
1286 dout_con<<"Got CONTROLTYPE_ACK: channelnum="
1287 <<((int)channelnum&0xff)<<", peer_id="<<peer_id
1288 <<", seqnum="<<seqnum<<std::endl;
1291 BufferedPacket p = channel->outgoing_reliables.popSeqnum(seqnum);
1292 // Get round trip time
1293 float rtt = p.totaltime;
1295 // Let peer calculate stuff according to it
1296 // (avg_rtt and resend_timeout)
1297 Peer *peer = getPeer(peer_id);
1298 peer->reportRTT(rtt);
1300 //PrintInfo(dout_con);
1301 //dout_con<<"RTT = "<<rtt<<std::endl;
1303 /*dout_con<<"OUTGOING: ";
1305 channel->outgoing_reliables.print();
1306 dout_con<<std::endl;*/
1308 catch(NotFoundException &e){
1309 PrintInfo(derr_con);
1310 derr_con<<"WARNING: ACKed packet not "
1315 throw ProcessedSilentlyException("Got an ACK");
1317 else if(controltype == CONTROLTYPE_SET_PEER_ID)
1319 if(packetdata.getSize() < 4)
1320 throw InvalidIncomingDataException
1321 ("packetdata.getSize() < 4 (SET_PEER_ID header size)");
1322 u16 peer_id_new = readU16(&packetdata[2]);
1324 dout_con<<"Got new peer id: "<<peer_id_new<<"... "<<std::endl;
1326 if(GetPeerID() != PEER_ID_INEXISTENT)
1328 PrintInfo(derr_con);
1329 derr_con<<"WARNING: Not changing"
1330 " existing peer id."<<std::endl;
1334 dout_con<<"changing."<<std::endl;
1335 SetPeerID(peer_id_new);
1337 throw ProcessedSilentlyException("Got a SET_PEER_ID");
1339 else if(controltype == CONTROLTYPE_PING)
1341 // Just ignore it, the incoming data already reset
1342 // the timeout counter
1344 dout_con<<"PING"<<std::endl;
1345 throw ProcessedSilentlyException("Got a PING");
1347 else if(controltype == CONTROLTYPE_DISCO)
1349 // Just ignore it, the incoming data already reset
1350 // the timeout counter
1352 dout_con<<"DISCO: Removing peer "<<(peer_id)<<std::endl;
1354 if(deletePeer(peer_id, false) == false)
1356 PrintInfo(derr_con);
1357 derr_con<<"DISCO: Peer not found"<<std::endl;
1360 throw ProcessedSilentlyException("Got a DISCO");
1363 PrintInfo(derr_con);
1364 derr_con<<"INVALID TYPE_CONTROL: invalid controltype="
1365 <<((int)controltype&0xff)<<std::endl;
1366 throw InvalidIncomingDataException("Invalid control type");
1369 else if(type == TYPE_ORIGINAL)
1371 if(packetdata.getSize() < ORIGINAL_HEADER_SIZE)
1372 throw InvalidIncomingDataException
1373 ("packetdata.getSize() < ORIGINAL_HEADER_SIZE");
1375 dout_con<<"RETURNING TYPE_ORIGINAL to user"
1377 // Get the inside packet out and return it
1378 SharedBuffer<u8> payload(packetdata.getSize() - ORIGINAL_HEADER_SIZE);
1379 memcpy(*payload, &packetdata[ORIGINAL_HEADER_SIZE], payload.getSize());
1382 else if(type == TYPE_SPLIT)
1384 // We have to create a packet again for buffering
1385 // This isn't actually too bad an idea.
1386 BufferedPacket packet = makePacket(
1387 getPeer(peer_id)->address,
1392 // Buffer the packet
1393 SharedBuffer<u8> data = channel->incoming_splits.insert(packet, reliable);
1394 if(data.getSize() != 0)
1397 dout_con<<"RETURNING TYPE_SPLIT: Constructed full data, "
1398 <<"size="<<data.getSize()<<std::endl;
1402 dout_con<<"BUFFERED TYPE_SPLIT"<<std::endl;
1403 throw ProcessedSilentlyException("Buffered a split packet chunk");
1405 else if(type == TYPE_RELIABLE)
1407 // Recursive reliable packets not allowed
1408 assert(reliable == false);
1410 if(packetdata.getSize() < RELIABLE_HEADER_SIZE)
1411 throw InvalidIncomingDataException
1412 ("packetdata.getSize() < RELIABLE_HEADER_SIZE");
1414 u16 seqnum = readU16(&packetdata[1]);
1416 bool is_future_packet = seqnum_higher(seqnum, channel->next_incoming_seqnum);
1417 bool is_old_packet = seqnum_higher(channel->next_incoming_seqnum, seqnum);
1420 if(is_future_packet)
1421 dout_con<<"BUFFERING";
1422 else if(is_old_packet)
1426 dout_con<<" TYPE_RELIABLE seqnum="<<seqnum
1427 <<" next="<<channel->next_incoming_seqnum;
1428 dout_con<<" [sending CONTROLTYPE_ACK"
1429 " to peer_id="<<peer_id<<"]";
1430 dout_con<<std::endl;
1433 //assert(channel->incoming_reliables.size() < 100);
1435 // Send a CONTROLTYPE_ACK
1436 SharedBuffer<u8> reply(4);
1437 writeU8(&reply[0], TYPE_CONTROL);
1438 writeU8(&reply[1], CONTROLTYPE_ACK);
1439 writeU16(&reply[2], seqnum);
1440 rawSendAsPacket(peer_id, channelnum, reply, false);
1442 //if(seqnum_higher(seqnum, channel->next_incoming_seqnum))
1443 if(is_future_packet)
1446 dout_con<<"Buffering reliable packet (seqnum="
1447 <<seqnum<<")"<<std::endl;*/
1449 // This one comes later, buffer it.
1450 // Actually we have to make a packet to buffer one.
1451 // Well, we have all the ingredients, so just do it.
1452 BufferedPacket packet = makePacket(
1453 getPeer(peer_id)->address,
1459 channel->incoming_reliables.insert(packet);
1462 dout_con<<"INCOMING: ";
1463 channel->incoming_reliables.print();
1464 dout_con<<std::endl;*/
1466 catch(AlreadyExistsException &e)
1470 throw ProcessedSilentlyException("Buffered future reliable packet");
1472 //else if(seqnum_higher(channel->next_incoming_seqnum, seqnum))
1473 else if(is_old_packet)
1475 // An old packet, dump it
1476 throw InvalidIncomingDataException("Got an old reliable packet");
1479 channel->next_incoming_seqnum++;
1481 // Get out the inside packet and re-process it
1482 SharedBuffer<u8> payload(packetdata.getSize() - RELIABLE_HEADER_SIZE);
1483 memcpy(*payload, &packetdata[RELIABLE_HEADER_SIZE], payload.getSize());
1485 return processPacket(channel, payload, peer_id, channelnum, true);
1489 PrintInfo(derr_con);
1490 derr_con<<"Got invalid type="<<((int)type&0xff)<<std::endl;
1491 throw InvalidIncomingDataException("Invalid packet type");
1494 // We should never get here.
1495 // If you get here, add an exception or a return to some of the
1496 // above conditionals.
1498 throw BaseException("Error in Channel::ProcessPacket()");
1501 bool Connection::deletePeer(u16 peer_id, bool timeout)
1503 if(m_peers.find(peer_id) == NULL)
1506 Peer *peer = m_peers[peer_id];
1510 e.peerRemoved(peer_id, timeout, peer->address);
1513 delete m_peers[peer_id];
1514 m_peers.remove(peer_id);
1520 ConnectionEvent Connection::getEvent()
1522 if(m_event_queue.size() == 0){
1524 e.type = CONNEVENT_NONE;
1527 return m_event_queue.pop_front();
1530 ConnectionEvent Connection::waitEvent(u32 timeout_ms)
1533 return m_event_queue.pop_front(timeout_ms);
1534 } catch(ItemNotFoundException &ex){
1536 e.type = CONNEVENT_NONE;
1541 void Connection::putCommand(ConnectionCommand &c)
1543 m_command_queue.push_back(c);
1546 void Connection::Serve(unsigned short port)
1548 ConnectionCommand c;
1553 void Connection::Connect(Address address)
1555 ConnectionCommand c;
1560 bool Connection::Connected()
1562 JMutexAutoLock peerlock(m_peers_mutex);
1564 if(m_peers.size() != 1)
1567 core::map<u16, Peer*>::Node *node = m_peers.find(PEER_ID_SERVER);
1571 if(m_peer_id == PEER_ID_INEXISTENT)
1577 void Connection::Disconnect()
1579 ConnectionCommand c;
1584 u32 Connection::Receive(u16 &peer_id, SharedBuffer<u8> &data)
1587 ConnectionEvent e = waitEvent(m_bc_receive_timeout);
1588 if(e.type != CONNEVENT_NONE)
1589 dout_con<<getDesc()<<": Receive: got event: "
1590 <<e.describe()<<std::endl;
1592 case CONNEVENT_NONE:
1593 throw NoIncomingDataException("No incoming data");
1594 case CONNEVENT_DATA_RECEIVED:
1595 peer_id = e.peer_id;
1596 data = SharedBuffer<u8>(e.data);
1597 return e.data.getSize();
1598 case CONNEVENT_PEER_ADDED: {
1599 Peer tmp(e.peer_id, e.address);
1600 if(m_bc_peerhandler)
1601 m_bc_peerhandler->peerAdded(&tmp);
1603 case CONNEVENT_PEER_REMOVED: {
1604 Peer tmp(e.peer_id, e.address);
1605 if(m_bc_peerhandler)
1606 m_bc_peerhandler->deletingPeer(&tmp, e.timeout);
1608 case CONNEVENT_BIND_FAILED:
1609 throw ConnectionBindFailed("Failed to bind socket "
1610 "(port already in use?)");
1613 throw NoIncomingDataException("No incoming data");
1616 void Connection::SendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable)
1618 assert(channelnum < CHANNEL_COUNT);
1620 ConnectionCommand c;
1621 c.sendToAll(channelnum, data, reliable);
1625 void Connection::Send(u16 peer_id, u8 channelnum,
1626 SharedBuffer<u8> data, bool reliable)
1628 assert(channelnum < CHANNEL_COUNT);
1630 ConnectionCommand c;
1631 c.send(peer_id, channelnum, data, reliable);
1635 void Connection::RunTimeouts(float dtime)
1640 Address Connection::GetPeerAddress(u16 peer_id)
1642 JMutexAutoLock peerlock(m_peers_mutex);
1643 return getPeer(peer_id)->address;
1646 float Connection::GetPeerAvgRTT(u16 peer_id)
1648 JMutexAutoLock peerlock(m_peers_mutex);
1649 return getPeer(peer_id)->avg_rtt;
1652 void Connection::DeletePeer(u16 peer_id)
1654 ConnectionCommand c;
1655 c.deletePeer(peer_id);
1659 void Connection::PrintInfo(std::ostream &out)
1661 out<<getDesc()<<": ";
1664 void Connection::PrintInfo()
1666 PrintInfo(dout_con);
1669 std::string Connection::getDesc()
1671 return std::string("con(")+itos(m_socket.GetHandle())+"/"+itos(m_peer_id)+")";