+
+ core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
+
+ if(node == NULL)
+ {
+ // Peer not found
+ // This means that the peer id of the sender is not PEER_ID_INEXISTENT
+ // and it is invalid.
+ PrintInfo(derr_con);
+ derr_con<<"Receive(): Peer not found"<<std::endl;
+ throw InvalidIncomingDataException("Peer not found (possible timeout)");
+ }
+
+ Peer *peer = node->getValue();
+
+ // Validate peer address
+ if(peer->address != sender)
+ {
+ PrintInfo(derr_con);
+ derr_con<<"Peer "<<peer_id<<" sending from different address."
+ " Ignoring."<<std::endl;
+ continue;
+ }
+
+ peer->timeout_counter = 0.0;
+
+ Channel *channel = &(peer->channels[channelnum]);
+
+ // Throw the received packet to channel->processPacket()
+
+ // Make a new SharedBuffer from the data without the base headers
+ SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE);
+ memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE],
+ strippeddata.getSize());
+
+ try{
+ // Process it (the result is some data with no headers made by us)
+ SharedBuffer<u8> resultdata = processPacket
+ (channel, strippeddata, peer_id, channelnum, false);
+
+ PrintInfo();
+ dout_con<<"ProcessPacket returned data of size "
+ <<resultdata.getSize()<<std::endl;
+
+ if(datasize < resultdata.getSize())
+ throw InvalidIncomingDataException
+ ("Buffer too small for received data");
+
+ ConnectionEvent e;
+ e.dataReceived(peer_id, resultdata);
+ putEvent(e);
+ continue;
+ }catch(ProcessedSilentlyException &e){
+ }
+ }catch(InvalidIncomingDataException &e){
+ }
+ catch(ProcessedSilentlyException &e){
+ }
+ } // for
+}
+
+void Connection::runTimeouts(float dtime)
+{
+ core::list<u16> timeouted_peers;
+ core::map<u16, Peer*>::Iterator j;
+ j = m_peers.getIterator();
+ for(; j.atEnd() == false; j++)
+ {
+ Peer *peer = j.getNode()->getValue();
+
+ /*
+ Check peer timeout
+ */
+ peer->timeout_counter += dtime;
+ if(peer->timeout_counter > m_timeout)
+ {
+ PrintInfo(derr_con);
+ derr_con<<"RunTimeouts(): Peer "<<peer->id
+ <<" has timed out."
+ <<" (source=peer->timeout_counter)"
+ <<std::endl;
+ // Add peer to the list
+ timeouted_peers.push_back(peer->id);
+ // Don't bother going through the buffers of this one
+ continue;
+ }
+
+ float resend_timeout = peer->resend_timeout;
+ for(u16 i=0; i<CHANNEL_COUNT; i++)
+ {
+ core::list<BufferedPacket> timed_outs;
+ core::list<BufferedPacket>::Iterator j;
+
+ Channel *channel = &peer->channels[i];
+
+ // Remove timed out incomplete unreliable split packets
+ channel->incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout);
+
+ // Increment reliable packet times
+ channel->outgoing_reliables.incrementTimeouts(dtime);
+
+ // Check reliable packet total times, remove peer if
+ // over timeout.
+ if(channel->outgoing_reliables.anyTotaltimeReached(m_timeout))
+ {
+ PrintInfo(derr_con);
+ derr_con<<"RunTimeouts(): Peer "<<peer->id
+ <<" has timed out."
+ <<" (source=reliable packet totaltime)"
+ <<std::endl;
+ // Add peer to the to-be-removed list
+ timeouted_peers.push_back(peer->id);
+ goto nextpeer;
+ }
+
+ // Re-send timed out outgoing reliables
+
+ timed_outs = channel->
+ outgoing_reliables.getTimedOuts(resend_timeout);
+
+ channel->outgoing_reliables.resetTimedOuts(resend_timeout);
+
+ j = timed_outs.begin();
+ for(; j != timed_outs.end(); j++)
+ {
+ u16 peer_id = readPeerId(*(j->data));
+ u8 channel = readChannel(*(j->data));
+ u16 seqnum = readU16(&(j->data[BASE_HEADER_SIZE+1]));
+
+ PrintInfo(derr_con);
+ derr_con<<"RE-SENDING timed-out RELIABLE to ";
+ j->address.print(&derr_con);
+ derr_con<<"(t/o="<<resend_timeout<<"): "
+ <<"from_peer_id="<<peer_id
+ <<", channel="<<((int)channel&0xff)
+ <<", seqnum="<<seqnum
+ <<std::endl;
+
+ rawSend(*j);
+
+ // Enlarge avg_rtt and resend_timeout:
+ // The rtt will be at least the timeout.
+ // NOTE: This won't affect the timeout of the next
+ // checked channel because it was cached.
+ peer->reportRTT(resend_timeout);
+ }
+ }
+
+ /*
+ Send pings
+ */
+ peer->ping_timer += dtime;
+ if(peer->ping_timer >= 5.0)
+ {
+ // Create and send PING packet
+ SharedBuffer<u8> data(2);
+ writeU8(&data[0], TYPE_CONTROL);
+ writeU8(&data[1], CONTROLTYPE_PING);
+ rawSendAsPacket(peer->id, 0, data, true);
+
+ peer->ping_timer = 0.0;
+ }
+
+nextpeer:
+ continue;
+ }
+
+ // Remove timed out peers
+ core::list<u16>::Iterator i = timeouted_peers.begin();
+ for(; i != timeouted_peers.end(); i++)
+ {
+ PrintInfo(derr_con);
+ derr_con<<"RunTimeouts(): Removing peer "<<(*i)<<std::endl;
+ deletePeer(*i, true);
+ }
+}
+
+void Connection::serve(u16 port)
+{
+ dout_con<<getDesc()<<" serving at port "<<port<<std::endl;
+ m_socket.Bind(port);
+ m_peer_id = PEER_ID_SERVER;
+}
+
+void Connection::connect(Address address)
+{
+ dout_con<<getDesc()<<" connecting to "<<address.serializeString()
+ <<":"<<address.getPort()<<std::endl;
+
+ core::map<u16, Peer*>::Node *node = m_peers.find(PEER_ID_SERVER);
+ if(node != NULL){
+ throw ConnectionException("Already connected to a server");
+ }
+
+ Peer *peer = new Peer(PEER_ID_SERVER, address);
+ m_peers.insert(peer->id, peer);
+
+ // Create event
+ ConnectionEvent e;
+ e.peerAdded(peer->id, peer->address);
+ putEvent(e);
+
+ m_socket.Bind(0);
+
+ // Send a dummy packet to server with peer_id = PEER_ID_INEXISTENT
+ m_peer_id = PEER_ID_INEXISTENT;
+ SharedBuffer<u8> data(0);
+ Send(PEER_ID_SERVER, 0, data, true);
+}
+
+void Connection::disconnect()
+{
+ dout_con<<getDesc()<<" disconnecting"<<std::endl;
+
+ // Create and send DISCO packet
+ SharedBuffer<u8> data(2);
+ writeU8(&data[0], TYPE_CONTROL);
+ writeU8(&data[1], CONTROLTYPE_DISCO);
+
+ // Send to all
+ core::map<u16, Peer*>::Iterator j;
+ j = m_peers.getIterator();
+ for(; j.atEnd() == false; j++)
+ {
+ Peer *peer = j.getNode()->getValue();
+ rawSendAsPacket(peer->id, 0, data, false);
+ }
+}
+
+void Connection::sendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable)
+{
+ core::map<u16, Peer*>::Iterator j;
+ j = m_peers.getIterator();
+ for(; j.atEnd() == false; j++)
+ {
+ Peer *peer = j.getNode()->getValue();
+ send(peer->id, channelnum, data, reliable);
+ }
+}
+
+void Connection::send(u16 peer_id, u8 channelnum,
+ SharedBuffer<u8> data, bool reliable)
+{
+ dout_con<<getDesc()<<" sending to peer_id="<<peer_id<<std::endl;
+
+ assert(channelnum < CHANNEL_COUNT);
+
+ Peer *peer = getPeerNoEx(peer_id);
+ if(peer == NULL)
+ return;
+ Channel *channel = &(peer->channels[channelnum]);
+
+ u32 chunksize_max = m_max_packet_size - BASE_HEADER_SIZE;
+ if(reliable)
+ chunksize_max -= RELIABLE_HEADER_SIZE;
+
+ core::list<SharedBuffer<u8> > originals;
+ originals = makeAutoSplitPacket(data, chunksize_max,
+ channel->next_outgoing_split_seqnum);
+
+ core::list<SharedBuffer<u8> >::Iterator i;
+ i = originals.begin();
+ for(; i != originals.end(); i++)
+ {
+ SharedBuffer<u8> original = *i;
+
+ sendAsPacket(peer_id, channelnum, original, reliable);
+ }
+}
+
+void Connection::sendAsPacket(u16 peer_id, u8 channelnum,
+ SharedBuffer<u8> data, bool reliable)
+{
+ OutgoingPacket packet(peer_id, channelnum, data, reliable);
+ m_outgoing_queue.push_back(packet);
+}
+
+void Connection::rawSendAsPacket(u16 peer_id, u8 channelnum,
+ SharedBuffer<u8> data, bool reliable)
+{
+ Peer *peer = getPeerNoEx(peer_id);
+ if(!peer)
+ return;
+ Channel *channel = &(peer->channels[channelnum]);
+
+ if(reliable)
+ {
+ u16 seqnum = channel->next_outgoing_seqnum;
+ channel->next_outgoing_seqnum++;
+
+ SharedBuffer<u8> reliable = makeReliablePacket(data, seqnum);
+
+ // Add base headers and make a packet
+ BufferedPacket p = makePacket(peer->address, reliable,
+ m_protocol_id, m_peer_id, channelnum);
+
+ try{
+ // Buffer the packet
+ channel->outgoing_reliables.insert(p);
+ }
+ catch(AlreadyExistsException &e)
+ {
+ PrintInfo(derr_con);
+ derr_con<<"WARNING: Going to send a reliable packet "
+ "seqnum="<<seqnum<<" that is already "
+ "in outgoing buffer"<<std::endl;
+ //assert(0);
+ }
+
+ // Send the packet
+ rawSend(p);
+ }
+ else
+ {
+ // Add base headers and make a packet
+ BufferedPacket p = makePacket(peer->address, data,
+ m_protocol_id, m_peer_id, channelnum);
+
+ // Send the packet
+ rawSend(p);
+ }
+}
+
+void Connection::rawSend(const BufferedPacket &packet)
+{
+ try{
+ m_socket.Send(packet.address, *packet.data, packet.data.getSize());
+ } catch(SendFailedException &e){
+ derr_con<<"Connection::rawSend(): SendFailedException: "
+ <<packet.address.serializeString()<<std::endl;
+ }
+}
+
+Peer* Connection::getPeer(u16 peer_id)
+{
+ core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
+
+ if(node == NULL){
+ throw PeerNotFoundException("GetPeer: Peer not found (possible timeout)");
+ }
+
+ // Error checking
+ assert(node->getValue()->id == peer_id);
+
+ return node->getValue();
+}
+
+Peer* Connection::getPeerNoEx(u16 peer_id)
+{
+ core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
+
+ if(node == NULL){
+ return NULL;
+ }
+
+ // Error checking
+ assert(node->getValue()->id == peer_id);
+
+ return node->getValue();
+}
+
+core::list<Peer*> Connection::getPeers()
+{
+ core::list<Peer*> list;
+ core::map<u16, Peer*>::Iterator j;
+ j = m_peers.getIterator();
+ for(; j.atEnd() == false; j++)
+ {
+ Peer *peer = j.getNode()->getValue();
+ list.push_back(peer);
+ }
+ return list;
+}
+
+bool Connection::getFromBuffers(u16 &peer_id, SharedBuffer<u8> &dst)
+{
+ core::map<u16, Peer*>::Iterator j;
+ j = m_peers.getIterator();
+ for(; j.atEnd() == false; j++)
+ {
+ Peer *peer = j.getNode()->getValue();
+ for(u16 i=0; i<CHANNEL_COUNT; i++)
+ {
+ Channel *channel = &peer->channels[i];
+ SharedBuffer<u8> resultdata;
+ bool got = checkIncomingBuffers(channel, peer_id, resultdata);
+ if(got){
+ dst = resultdata;
+ return true;
+ }
+ }
+ }
+ return false;
+}
+
+bool Connection::checkIncomingBuffers(Channel *channel, u16 &peer_id,
+ SharedBuffer<u8> &dst)
+{
+ u16 firstseqnum = 0;
+ // Clear old packets from start of buffer
+ try{
+ for(;;){
+ firstseqnum = channel->incoming_reliables.getFirstSeqnum();
+ if(seqnum_higher(channel->next_incoming_seqnum, firstseqnum))
+ channel->incoming_reliables.popFirst();
+ else
+ break;
+ }
+ // This happens if all packets are old
+ }catch(con::NotFoundException)
+ {}
+
+ if(channel->incoming_reliables.empty() == false)
+ {
+ if(firstseqnum == channel->next_incoming_seqnum)
+ {
+ BufferedPacket p = channel->incoming_reliables.popFirst();
+
+ peer_id = readPeerId(*p.data);
+ u8 channelnum = readChannel(*p.data);
+ u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
+
+ PrintInfo();
+ dout_con<<"UNBUFFERING TYPE_RELIABLE"
+ <<" seqnum="<<seqnum
+ <<" peer_id="<<peer_id
+ <<" channel="<<((int)channelnum&0xff)
+ <<std::endl;
+
+ channel->next_incoming_seqnum++;
+
+ u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
+ // Get out the inside packet and re-process it
+ SharedBuffer<u8> payload(p.data.getSize() - headers_size);
+ memcpy(*payload, &p.data[headers_size], payload.getSize());
+
+ dst = processPacket(channel, payload, peer_id, channelnum, true);
+ return true;
+ }
+ }
+ return false;
+}
+
+SharedBuffer<u8> Connection::processPacket(Channel *channel,
+ SharedBuffer<u8> packetdata, u16 peer_id,
+ u8 channelnum, bool reliable)
+{
+ IndentationRaiser iraiser(&(m_indentation));
+
+ if(packetdata.getSize() < 1)
+ throw InvalidIncomingDataException("packetdata.getSize() < 1");
+
+ u8 type = readU8(&packetdata[0]);
+
+ if(type == TYPE_CONTROL)
+ {
+ if(packetdata.getSize() < 2)
+ throw InvalidIncomingDataException("packetdata.getSize() < 2");
+
+ u8 controltype = readU8(&packetdata[1]);
+
+ if(controltype == CONTROLTYPE_ACK)
+ {
+ if(packetdata.getSize() < 4)
+ throw InvalidIncomingDataException
+ ("packetdata.getSize() < 4 (ACK header size)");
+
+ u16 seqnum = readU16(&packetdata[2]);
+ PrintInfo();
+ dout_con<<"Got CONTROLTYPE_ACK: channelnum="
+ <<((int)channelnum&0xff)<<", peer_id="<<peer_id
+ <<", seqnum="<<seqnum<<std::endl;
+
+ try{
+ BufferedPacket p = channel->outgoing_reliables.popSeqnum(seqnum);
+ // Get round trip time
+ float rtt = p.totaltime;
+
+ // Let peer calculate stuff according to it
+ // (avg_rtt and resend_timeout)
+ Peer *peer = getPeer(peer_id);
+ peer->reportRTT(rtt);
+
+ //PrintInfo(dout_con);
+ //dout_con<<"RTT = "<<rtt<<std::endl;
+
+ /*dout_con<<"OUTGOING: ";
+ PrintInfo();
+ channel->outgoing_reliables.print();
+ dout_con<<std::endl;*/
+ }
+ catch(NotFoundException &e){
+ PrintInfo(derr_con);
+ derr_con<<"WARNING: ACKed packet not "
+ "in outgoing queue"
+ <<std::endl;
+ }
+
+ throw ProcessedSilentlyException("Got an ACK");
+ }
+ else if(controltype == CONTROLTYPE_SET_PEER_ID)
+ {
+ if(packetdata.getSize() < 4)
+ throw InvalidIncomingDataException
+ ("packetdata.getSize() < 4 (SET_PEER_ID header size)");
+ u16 peer_id_new = readU16(&packetdata[2]);
+ PrintInfo();
+ dout_con<<"Got new peer id: "<<peer_id_new<<"... "<<std::endl;
+
+ if(GetPeerID() != PEER_ID_INEXISTENT)
+ {
+ PrintInfo(derr_con);
+ derr_con<<"WARNING: Not changing"
+ " existing peer id."<<std::endl;
+ }
+ else
+ {
+ dout_con<<"changing."<<std::endl;
+ SetPeerID(peer_id_new);
+ }
+ throw ProcessedSilentlyException("Got a SET_PEER_ID");
+ }
+ else if(controltype == CONTROLTYPE_PING)
+ {
+ // Just ignore it, the incoming data already reset
+ // the timeout counter
+ PrintInfo();
+ dout_con<<"PING"<<std::endl;
+ throw ProcessedSilentlyException("Got a PING");
+ }
+ else if(controltype == CONTROLTYPE_DISCO)
+ {
+ // Just ignore it, the incoming data already reset
+ // the timeout counter
+ PrintInfo();
+ dout_con<<"DISCO: Removing peer "<<(peer_id)<<std::endl;
+
+ if(deletePeer(peer_id, false) == false)
+ {
+ PrintInfo(derr_con);
+ derr_con<<"DISCO: Peer not found"<<std::endl;
+ }
+
+ throw ProcessedSilentlyException("Got a DISCO");
+ }
+ else{
+ PrintInfo(derr_con);
+ derr_con<<"INVALID TYPE_CONTROL: invalid controltype="
+ <<((int)controltype&0xff)<<std::endl;
+ throw InvalidIncomingDataException("Invalid control type");