1 #include "connection.h"
3 #include "serialization.h"
8 BufferedPacket makePacket(Address &address, u8 *data, u32 datasize,
9 u32 protocol_id, u16 sender_peer_id, u8 channel)
11 u32 packet_size = datasize + BASE_HEADER_SIZE;
12 BufferedPacket p(packet_size);
15 writeU32(&p.data[0], protocol_id);
16 writeU16(&p.data[4], sender_peer_id);
17 writeU8(&p.data[6], channel);
19 memcpy(&p.data[BASE_HEADER_SIZE], data, datasize);
24 BufferedPacket makePacket(Address &address, SharedBuffer<u8> &data,
25 u32 protocol_id, u16 sender_peer_id, u8 channel)
27 return makePacket(address, *data, data.getSize(),
28 protocol_id, sender_peer_id, channel);
31 SharedBuffer<u8> makeOriginalPacket(
32 SharedBuffer<u8> data)
35 u32 packet_size = data.getSize() + header_size;
36 SharedBuffer<u8> b(packet_size);
38 writeU8(&b[0], TYPE_ORIGINAL);
40 memcpy(&b[header_size], *data, data.getSize());
45 core::list<SharedBuffer<u8> > makeSplitPacket(
46 SharedBuffer<u8> data,
50 // Chunk packets, containing the TYPE_SPLIT header
51 core::list<SharedBuffer<u8> > chunks;
53 u32 chunk_header_size = 7;
54 u32 maximum_data_size = chunksize_max - chunk_header_size;
59 end = start + maximum_data_size - 1;
60 if(end > data.getSize() - 1)
61 end = data.getSize() - 1;
63 u32 payload_size = end - start + 1;
64 u32 packet_size = chunk_header_size + payload_size;
66 SharedBuffer<u8> chunk(packet_size);
68 writeU8(&chunk[0], TYPE_SPLIT);
69 writeU16(&chunk[1], seqnum);
70 // [3] u16 chunk_count is written at next stage
71 writeU16(&chunk[5], chunk_num);
72 memcpy(&chunk[chunk_header_size], &data[start], payload_size);
74 chunks.push_back(chunk);
79 while(end != data.getSize() - 1);
81 u16 chunk_count = chunks.getSize();
83 core::list<SharedBuffer<u8> >::Iterator i = chunks.begin();
84 for(; i != chunks.end(); i++)
87 writeU16(&((*i)[3]), chunk_count);
93 core::list<SharedBuffer<u8> > makeAutoSplitPacket(
94 SharedBuffer<u8> data,
98 u32 original_header_size = 1;
99 core::list<SharedBuffer<u8> > list;
100 if(data.getSize() + original_header_size > chunksize_max)
102 list = makeSplitPacket(data, chunksize_max, split_seqnum);
108 list.push_back(makeOriginalPacket(data));
113 SharedBuffer<u8> makeReliablePacket(
114 SharedBuffer<u8> data,
117 /*dstream<<"BEGIN SharedBuffer<u8> makeReliablePacket()"<<std::endl;
118 dstream<<"data.getSize()="<<data.getSize()<<", data[0]="
119 <<((unsigned int)data[0]&0xff)<<std::endl;*/
121 u32 packet_size = data.getSize() + header_size;
122 SharedBuffer<u8> b(packet_size);
124 writeU8(&b[0], TYPE_RELIABLE);
125 writeU16(&b[1], seqnum);
127 memcpy(&b[header_size], *data, data.getSize());
129 /*dstream<<"data.getSize()="<<data.getSize()<<", data[0]="
130 <<((unsigned int)data[0]&0xff)<<std::endl;*/
131 //dstream<<"END SharedBuffer<u8> makeReliablePacket()"<<std::endl;
139 void ReliablePacketBuffer::print()
141 core::list<BufferedPacket>::Iterator i;
143 for(; i != m_list.end(); i++)
145 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
149 bool ReliablePacketBuffer::empty()
151 return m_list.empty();
153 u32 ReliablePacketBuffer::size()
155 return m_list.getSize();
157 RPBSearchResult ReliablePacketBuffer::findPacket(u16 seqnum)
159 core::list<BufferedPacket>::Iterator i;
161 for(; i != m_list.end(); i++)
163 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
164 /*dout_con<<"findPacket(): finding seqnum="<<seqnum
165 <<", comparing to s="<<s<<std::endl;*/
171 RPBSearchResult ReliablePacketBuffer::notFound()
175 u16 ReliablePacketBuffer::getFirstSeqnum()
178 throw NotFoundException("Buffer is empty");
179 BufferedPacket p = *m_list.begin();
180 return readU16(&p.data[BASE_HEADER_SIZE+1]);
182 BufferedPacket ReliablePacketBuffer::popFirst()
185 throw NotFoundException("Buffer is empty");
186 BufferedPacket p = *m_list.begin();
187 core::list<BufferedPacket>::Iterator i = m_list.begin();
191 BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum)
193 RPBSearchResult r = findPacket(seqnum);
195 dout_con<<"Not found"<<std::endl;
196 throw NotFoundException("seqnum not found in buffer");
198 BufferedPacket p = *r;
202 void ReliablePacketBuffer::insert(BufferedPacket &p)
204 assert(p.data.getSize() >= BASE_HEADER_SIZE+3);
205 u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
206 assert(type == TYPE_RELIABLE);
207 u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
209 // Find the right place for the packet and insert it there
211 // If list is empty, just add it
218 // Otherwise find the right place
219 core::list<BufferedPacket>::Iterator i;
221 // Find the first packet in the list which has a higher seqnum
222 for(; i != m_list.end(); i++){
223 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
225 throw AlreadyExistsException("Same seqnum in list");
227 if(seqnum_higher(s, seqnum)){
231 // If we're at the end of the list, add the packet to the
233 if(i == m_list.end())
240 m_list.insert_before(i, p);
243 void ReliablePacketBuffer::incrementTimeouts(float dtime)
245 core::list<BufferedPacket>::Iterator i;
247 for(; i != m_list.end(); i++){
249 i->totaltime += dtime;
253 void ReliablePacketBuffer::resetTimedOuts(float timeout)
255 core::list<BufferedPacket>::Iterator i;
257 for(; i != m_list.end(); i++){
258 if(i->time >= timeout)
263 bool ReliablePacketBuffer::anyTotaltimeReached(float timeout)
265 core::list<BufferedPacket>::Iterator i;
267 for(; i != m_list.end(); i++){
268 if(i->totaltime >= timeout)
274 core::list<BufferedPacket> ReliablePacketBuffer::getTimedOuts(float timeout)
276 core::list<BufferedPacket> timed_outs;
277 core::list<BufferedPacket>::Iterator i;
279 for(; i != m_list.end(); i++)
281 if(i->time >= timeout)
282 timed_outs.push_back(*i);
291 IncomingSplitBuffer::~IncomingSplitBuffer()
293 core::map<u16, IncomingSplitPacket*>::Iterator i;
294 i = m_buf.getIterator();
295 for(; i.atEnd() == false; i++)
297 delete i.getNode()->getValue();
301 This will throw a GotSplitPacketException when a full
302 split packet is constructed.
304 void IncomingSplitBuffer::insert(BufferedPacket &p, bool reliable)
306 u32 headersize = BASE_HEADER_SIZE + 7;
307 assert(p.data.getSize() >= headersize);
308 u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
309 assert(type == TYPE_SPLIT);
310 u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
311 u16 chunk_count = readU16(&p.data[BASE_HEADER_SIZE+3]);
312 u16 chunk_num = readU16(&p.data[BASE_HEADER_SIZE+5]);
314 // Add if doesn't exist
315 if(m_buf.find(seqnum) == NULL)
317 IncomingSplitPacket *sp = new IncomingSplitPacket();
318 sp->chunk_count = chunk_count;
319 sp->reliable = reliable;
323 IncomingSplitPacket *sp = m_buf[seqnum];
325 // TODO: These errors should be thrown or something? Dunno.
326 if(chunk_count != sp->chunk_count)
327 derr_con<<"Connection: WARNING: chunk_count="<<chunk_count
328 <<" != sp->chunk_count="<<sp->chunk_count
330 if(reliable != sp->reliable)
331 derr_con<<"Connection: WARNING: reliable="<<reliable
332 <<" != sp->reliable="<<sp->reliable
335 // If chunk already exists, cancel
336 if(sp->chunks.find(chunk_num) != NULL)
337 throw AlreadyExistsException("Chunk already in buffer");
339 // Cut chunk data out of packet
340 u32 chunkdatasize = p.data.getSize() - headersize;
341 SharedBuffer<u8> chunkdata(chunkdatasize);
342 memcpy(*chunkdata, &(p.data[headersize]), chunkdatasize);
344 // Set chunk data in buffer
345 sp->chunks[chunk_num] = chunkdata;
347 // If not all chunks are received, return
348 if(sp->allReceived() == false)
351 // Calculate total size
353 core::map<u16, SharedBuffer<u8> >::Iterator i;
354 i = sp->chunks.getIterator();
355 for(; i.atEnd() == false; i++)
357 totalsize += i.getNode()->getValue().getSize();
360 SharedBuffer<u8> fulldata(totalsize);
362 // Copy chunks to data buffer
364 for(u32 chunk_i=0; chunk_i<sp->chunk_count;
367 SharedBuffer<u8> buf = sp->chunks[chunk_i];
368 u16 chunkdatasize = buf.getSize();
369 memcpy(&fulldata[start], *buf, chunkdatasize);
370 start += chunkdatasize;;
373 // Remove sp from buffer
374 m_buf.remove(seqnum);
377 throw GotSplitPacketException(fulldata);
379 void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout)
381 core::list<u16> remove_queue;
382 core::map<u16, IncomingSplitPacket*>::Iterator i;
383 i = m_buf.getIterator();
384 for(; i.atEnd() == false; i++)
386 IncomingSplitPacket *p = i.getNode()->getValue();
387 // Reliable ones are not removed by timeout
388 if(p->reliable == true)
391 if(p->time >= timeout)
392 remove_queue.push_back(i.getNode()->getKey());
394 core::list<u16>::Iterator j;
395 j = remove_queue.begin();
396 for(; j != remove_queue.end(); j++)
398 dout_con<<"NOTE: Removing timed out unreliable split packet"
411 next_outgoing_seqnum = SEQNUM_INITIAL;
412 next_incoming_seqnum = SEQNUM_INITIAL;
413 next_outgoing_split_seqnum = SEQNUM_INITIAL;
423 Peer::Peer(u16 a_id, Address a_address)
427 timeout_counter = 0.0;
428 //resend_timeout = RESEND_TIMEOUT_MINIMUM;
429 resend_timeout = 0.5;
431 has_sent_with_id = false;
437 void Peer::reportRTT(float rtt)
441 else if(avg_rtt < 0.0)
444 avg_rtt = rtt * 0.1 + avg_rtt * 0.9;
446 // Calculate resend_timeout
448 /*int reliable_count = 0;
449 for(int i=0; i<CHANNEL_COUNT; i++)
451 reliable_count += channels[i].outgoing_reliables.size();
453 float timeout = avg_rtt * RESEND_TIMEOUT_FACTOR
454 * ((float)reliable_count * 1);*/
456 float timeout = avg_rtt * RESEND_TIMEOUT_FACTOR;
457 if(timeout < RESEND_TIMEOUT_MIN)
458 timeout = RESEND_TIMEOUT_MIN;
459 if(timeout > RESEND_TIMEOUT_MAX)
460 timeout = RESEND_TIMEOUT_MAX;
461 resend_timeout = timeout;
468 Connection::Connection(
472 PeerHandler *peerhandler
475 assert(peerhandler != NULL);
477 m_protocol_id = protocol_id;
478 m_max_packet_size = max_packet_size;
480 m_peer_id = PEER_ID_NEW;
481 //m_waiting_new_peer_id = false;
483 m_peerhandler = peerhandler;
486 Connection::~Connection()
489 core::map<u16, Peer*>::Iterator j;
490 j = m_peers.getIterator();
491 for(; j.atEnd() == false; j++)
493 Peer *peer = j.getNode()->getValue();
498 void Connection::Serve(unsigned short port)
501 m_peer_id = PEER_ID_SERVER;
504 void Connection::Connect(Address address)
506 core::map<u16, Peer*>::Node *node = m_peers.find(PEER_ID_SERVER);
508 throw ConnectionException("Already connected to a server");
511 Peer *peer = new Peer(PEER_ID_SERVER, address);
512 m_peers.insert(peer->id, peer);
513 m_peerhandler->peerAdded(peer);
517 // Send a dummy packet to server with peer_id = PEER_ID_NEW
518 m_peer_id = PEER_ID_NEW;
519 SharedBuffer<u8> data(0);
520 Send(PEER_ID_SERVER, 0, data, true);
522 //m_waiting_new_peer_id = true;
525 bool Connection::Connected()
527 if(m_peers.size() != 1)
530 core::map<u16, Peer*>::Node *node = m_peers.find(PEER_ID_SERVER);
534 if(m_peer_id == PEER_ID_NEW)
540 SharedBuffer<u8> Channel::ProcessPacket(
541 SharedBuffer<u8> packetdata,
547 IndentationRaiser iraiser(&(con->m_indentation));
549 if(packetdata.getSize() < 1)
550 throw InvalidIncomingDataException("packetdata.getSize() < 1");
552 u8 type = readU8(&packetdata[0]);
554 if(type == TYPE_CONTROL)
556 if(packetdata.getSize() < 2)
557 throw InvalidIncomingDataException("packetdata.getSize() < 2");
559 u8 controltype = readU8(&packetdata[1]);
561 if(controltype == CONTROLTYPE_ACK)
563 if(packetdata.getSize() < 4)
564 throw InvalidIncomingDataException
565 ("packetdata.getSize() < 4 (ACK header size)");
567 u16 seqnum = readU16(&packetdata[2]);
569 dout_con<<"Got CONTROLTYPE_ACK: channelnum="
570 <<((int)channelnum&0xff)<<", peer_id="<<peer_id
571 <<", seqnum="<<seqnum<<std::endl;
574 BufferedPacket p = outgoing_reliables.popSeqnum(seqnum);
575 // Get round trip time
576 float rtt = p.totaltime;
578 // Let peer calculate stuff according to it
579 // (avg_rtt and resend_timeout)
580 Peer *peer = con->GetPeer(peer_id);
581 peer->reportRTT(rtt);
583 //con->PrintInfo(dout_con);
584 //dout_con<<"RTT = "<<rtt<<std::endl;
586 /*dout_con<<"OUTGOING: ";
588 outgoing_reliables.print();
589 dout_con<<std::endl;*/
591 catch(NotFoundException &e){
592 con->PrintInfo(derr_con);
593 derr_con<<"WARNING: ACKed packet not "
598 throw ProcessedSilentlyException("Got an ACK");
600 else if(controltype == CONTROLTYPE_SET_PEER_ID)
602 if(packetdata.getSize() < 4)
603 throw InvalidIncomingDataException
604 ("packetdata.getSize() < 4 (SET_PEER_ID header size)");
605 u16 peer_id_new = readU16(&packetdata[2]);
607 dout_con<<"Got new peer id: "<<peer_id_new<<"... "<<std::endl;
609 if(con->GetPeerID() != PEER_ID_NEW)
611 con->PrintInfo(derr_con);
612 derr_con<<"WARNING: Not changing"
613 " existing peer id."<<std::endl;
617 dout_con<<"changing."<<std::endl;
618 con->SetPeerID(peer_id_new);
620 throw ProcessedSilentlyException("Got a SET_PEER_ID");
622 else if(controltype == CONTROLTYPE_PING)
624 // Just ignore it, the incoming data already reset
625 // the timeout counter
627 dout_con<<"PING"<<std::endl;
628 throw ProcessedSilentlyException("Got a SET_PEER_ID");
631 con->PrintInfo(derr_con);
632 derr_con<<"INVALID TYPE_CONTROL: invalid controltype="
633 <<((int)controltype&0xff)<<std::endl;
634 throw InvalidIncomingDataException("Invalid control type");
637 else if(type == TYPE_ORIGINAL)
639 if(packetdata.getSize() < ORIGINAL_HEADER_SIZE)
640 throw InvalidIncomingDataException
641 ("packetdata.getSize() < ORIGINAL_HEADER_SIZE");
643 dout_con<<"RETURNING TYPE_ORIGINAL to user"
645 // Get the inside packet out and return it
646 SharedBuffer<u8> payload(packetdata.getSize() - ORIGINAL_HEADER_SIZE);
647 memcpy(*payload, &packetdata[ORIGINAL_HEADER_SIZE], payload.getSize());
650 else if(type == TYPE_SPLIT)
652 // We have to create a packet again for buffering
653 // This isn't actually too bad an idea.
654 BufferedPacket packet = makePacket(
655 con->GetPeer(peer_id)->address,
657 con->GetProtocolID(),
662 incoming_splits.insert(packet, reliable);
664 // This exception happens when all the pieces of a packet
666 catch(GotSplitPacketException &e)
669 dout_con<<"RETURNING TYPE_SPLIT: Constructed full data, "
670 <<"size="<<e.getData().getSize()<<std::endl;
674 dout_con<<"BUFFERING TYPE_SPLIT"<<std::endl;
675 throw ProcessedSilentlyException("Buffered a split packet chunk");
677 else if(type == TYPE_RELIABLE)
679 // Recursive reliable packets not allowed
680 assert(reliable == false);
682 if(packetdata.getSize() < RELIABLE_HEADER_SIZE)
683 throw InvalidIncomingDataException
684 ("packetdata.getSize() < RELIABLE_HEADER_SIZE");
686 u16 seqnum = readU16(&packetdata[1]);
688 bool is_future_packet = seqnum_higher(seqnum, next_incoming_seqnum);
689 bool is_old_packet = seqnum_higher(next_incoming_seqnum, seqnum);
693 dout_con<<"BUFFERING";
694 else if(is_old_packet)
698 dout_con<<" TYPE_RELIABLE seqnum="<<seqnum
699 <<" next="<<next_incoming_seqnum;
700 dout_con<<" [sending CONTROLTYPE_ACK"
701 " to peer_id="<<peer_id<<"]";
705 //assert(incoming_reliables.size() < 100);
707 // Send a CONTROLTYPE_ACK
708 SharedBuffer<u8> reply(4);
709 writeU8(&reply[0], TYPE_CONTROL);
710 writeU8(&reply[1], CONTROLTYPE_ACK);
711 writeU16(&reply[2], seqnum);
712 con->SendAsPacket(peer_id, channelnum, reply, false);
714 //if(seqnum_higher(seqnum, next_incoming_seqnum))
718 dout_con<<"Buffering reliable packet (seqnum="
719 <<seqnum<<")"<<std::endl;*/
721 // This one comes later, buffer it.
722 // Actually we have to make a packet to buffer one.
723 // Well, we have all the ingredients, so just do it.
724 BufferedPacket packet = makePacket(
725 con->GetPeer(peer_id)->address,
727 con->GetProtocolID(),
731 incoming_reliables.insert(packet);
734 dout_con<<"INCOMING: ";
735 incoming_reliables.print();
736 dout_con<<std::endl;*/
738 catch(AlreadyExistsException &e)
742 throw ProcessedSilentlyException("Buffered future reliable packet");
744 //else if(seqnum_higher(next_incoming_seqnum, seqnum))
745 else if(is_old_packet)
747 // An old packet, dump it
748 throw InvalidIncomingDataException("Got an old reliable packet");
751 next_incoming_seqnum++;
753 // Get out the inside packet and re-process it
754 SharedBuffer<u8> payload(packetdata.getSize() - RELIABLE_HEADER_SIZE);
755 memcpy(*payload, &packetdata[RELIABLE_HEADER_SIZE], payload.getSize());
757 return ProcessPacket(payload, con, peer_id, channelnum, true);
761 con->PrintInfo(derr_con);
762 derr_con<<"Got invalid type="<<((int)type&0xff)<<std::endl;
763 throw InvalidIncomingDataException("Invalid packet type");
766 // We should never get here.
767 // If you get here, add an exception or a return to some of the
768 // above conditionals.
770 throw BaseException("Error in Channel::ProcessPacket()");
773 SharedBuffer<u8> Channel::CheckIncomingBuffers(Connection *con,
777 // Clear old packets from start of buffer
780 firstseqnum = incoming_reliables.getFirstSeqnum();
781 if(seqnum_higher(next_incoming_seqnum, firstseqnum))
782 incoming_reliables.popFirst();
786 // This happens if all packets are old
787 }catch(con::NotFoundException)
790 if(incoming_reliables.empty() == false)
792 if(firstseqnum == next_incoming_seqnum)
794 BufferedPacket p = incoming_reliables.popFirst();
796 peer_id = readPeerId(*p.data);
797 u8 channelnum = readChannel(*p.data);
798 u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
801 dout_con<<"UNBUFFERING TYPE_RELIABLE"
803 <<" peer_id="<<peer_id
804 <<" channel="<<((int)channelnum&0xff)
807 next_incoming_seqnum++;
809 u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
810 // Get out the inside packet and re-process it
811 SharedBuffer<u8> payload(p.data.getSize() - headers_size);
812 memcpy(*payload, &p.data[headers_size], payload.getSize());
814 return ProcessPacket(payload, con, peer_id, channelnum, true);
818 throw NoIncomingDataException("No relevant data in buffers");
821 SharedBuffer<u8> Connection::GetFromBuffers(u16 &peer_id)
823 core::map<u16, Peer*>::Iterator j;
824 j = m_peers.getIterator();
825 for(; j.atEnd() == false; j++)
827 Peer *peer = j.getNode()->getValue();
828 for(u16 i=0; i<CHANNEL_COUNT; i++)
830 Channel *channel = &peer->channels[i];
832 SharedBuffer<u8> resultdata = channel->CheckIncomingBuffers
837 catch(NoIncomingDataException &e)
840 catch(InvalidIncomingDataException &e)
843 catch(ProcessedSilentlyException &e)
848 throw NoIncomingDataException("No relevant data in buffers");
851 u32 Connection::Receive(u16 &peer_id, u8 *data, u32 datasize)
854 Receive a packet from the network
857 // TODO: We can not know how many layers of header there are.
858 // For now, just assume there are no other than the base headers.
859 u32 packet_maxsize = datasize + BASE_HEADER_SIZE;
860 Buffer<u8> packetdata(packet_maxsize);
867 Check if some buffer has relevant data
870 SharedBuffer<u8> resultdata = GetFromBuffers(peer_id);
872 if(datasize < resultdata.getSize())
873 throw InvalidIncomingDataException
874 ("Buffer too small for received data");
876 memcpy(data, *resultdata, resultdata.getSize());
877 return resultdata.getSize();
879 catch(NoIncomingDataException &e)
885 s32 received_size = m_socket.Receive(sender, *packetdata, packet_maxsize);
887 if(received_size < 0)
888 throw NoIncomingDataException("No incoming data");
889 if(received_size < BASE_HEADER_SIZE)
890 throw InvalidIncomingDataException("No full header received");
891 if(readU32(&packetdata[0]) != m_protocol_id)
892 throw InvalidIncomingDataException("Invalid protocol id");
894 peer_id = readPeerId(*packetdata);
895 u8 channelnum = readChannel(*packetdata);
896 if(channelnum > CHANNEL_COUNT-1){
898 derr_con<<"Receive(): Invalid channel "<<channelnum<<std::endl;
899 throw InvalidIncomingDataException("Channel doesn't exist");
902 if(peer_id == PEER_ID_NEW)
905 Somebody is trying to send stuff to us with no peer id.
907 Check if the same address and port was added to our peer
909 Allow only entries that have has_sent_with_id==false.
912 core::map<u16, Peer*>::Iterator j;
913 j = m_peers.getIterator();
914 for(; j.atEnd() == false; j++)
916 Peer *peer = j.getNode()->getValue();
917 if(peer->has_sent_with_id)
919 if(peer->address == sender)
924 If no peer was found with the same address and port,
925 we shall assume it is a new peer and create an entry.
929 // Pass on to adding the peer
931 // Else: A peer was found.
934 Peer *peer = j.getNode()->getValue();
937 derr_con<<"WARNING: Assuming unknown peer to be "
938 <<"peer_id="<<peer_id<<std::endl;
943 The peer was not found in our lists. Add it.
945 if(peer_id == PEER_ID_NEW)
947 // Somebody wants to make a new connection
949 // Get a unique peer id (2 or higher)
952 Find an unused peer id
957 if(m_peers.find(peer_id_new) == NULL)
959 // Check for overflow
960 if(peer_id_new == 65535)
961 throw ConnectionException
962 ("Connection ran out of peer ids");
967 dout_con<<"Receive(): Got a packet with peer_id=PEER_ID_NEW,"
968 " giving peer_id="<<peer_id_new<<std::endl;
971 Peer *peer = new Peer(peer_id_new, sender);
972 m_peers.insert(peer->id, peer);
973 m_peerhandler->peerAdded(peer);
975 // Create CONTROL packet to tell the peer id to the new peer.
976 SharedBuffer<u8> reply(4);
977 writeU8(&reply[0], TYPE_CONTROL);
978 writeU8(&reply[1], CONTROLTYPE_SET_PEER_ID);
979 writeU16(&reply[2], peer_id_new);
980 SendAsPacket(peer_id_new, 0, reply, true);
982 // We're now talking to a valid peer_id
983 peer_id = peer_id_new;
985 // Go on and process whatever it sent
988 core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
993 // This means that the peer id of the sender is not PEER_ID_NEW
994 // and it is invalid.
996 derr_con<<"Receive(): Peer not found"<<std::endl;
997 throw InvalidIncomingDataException("Peer not found (possible timeout)");
1000 Peer *peer = node->getValue();
1002 // Validate peer address
1003 if(peer->address != sender)
1005 PrintInfo(derr_con);
1006 derr_con<<"Peer "<<peer_id<<" sending from different address."
1007 " Ignoring."<<std::endl;
1008 throw InvalidIncomingDataException
1009 ("Peer sending from different address");
1010 /*// If there is more data, receive again
1011 if(m_socket.WaitData(0) == true)
1013 throw NoIncomingDataException("No incoming data (2)");*/
1016 peer->timeout_counter = 0.0;
1018 Channel *channel = &(peer->channels[channelnum]);
1020 // Throw the received packet to channel->processPacket()
1022 // Make a new SharedBuffer from the data without the base headers
1023 SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE);
1024 memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE],
1025 strippeddata.getSize());
1028 // Process it (the result is some data with no headers made by us)
1029 SharedBuffer<u8> resultdata = channel->ProcessPacket
1030 (strippeddata, this, peer_id, channelnum);
1033 dout_con<<"ProcessPacket returned data of size "
1034 <<resultdata.getSize()<<std::endl;
1036 if(datasize < resultdata.getSize())
1037 throw InvalidIncomingDataException
1038 ("Buffer too small for received data");
1040 memcpy(data, *resultdata, resultdata.getSize());
1041 return resultdata.getSize();
1043 catch(ProcessedSilentlyException &e)
1045 // If there is more data, receive again
1046 if(m_socket.WaitData(0) == true)
1049 throw NoIncomingDataException("No incoming data (2)");
1051 catch(InvalidIncomingDataException &e)
1053 // If there is more data, receive again
1054 if(m_socket.WaitData(0) == true)
1060 void Connection::SendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable)
1062 core::map<u16, Peer*>::Iterator j;
1063 j = m_peers.getIterator();
1064 for(; j.atEnd() == false; j++)
1066 Peer *peer = j.getNode()->getValue();
1067 Send(peer->id, channelnum, data, reliable);
1071 void Connection::Send(u16 peer_id, u8 channelnum,
1072 SharedBuffer<u8> data, bool reliable)
1074 assert(channelnum < CHANNEL_COUNT);
1076 Peer *peer = GetPeer(peer_id);
1077 Channel *channel = &(peer->channels[channelnum]);
1079 u32 chunksize_max = m_max_packet_size - BASE_HEADER_SIZE;
1081 chunksize_max -= RELIABLE_HEADER_SIZE;
1083 core::list<SharedBuffer<u8> > originals;
1084 originals = makeAutoSplitPacket(data, chunksize_max,
1085 channel->next_outgoing_split_seqnum);
1087 core::list<SharedBuffer<u8> >::Iterator i;
1088 i = originals.begin();
1089 for(; i != originals.end(); i++)
1091 SharedBuffer<u8> original = *i;
1093 SendAsPacket(peer_id, channelnum, original, reliable);
1097 void Connection::SendAsPacket(u16 peer_id, u8 channelnum,
1098 SharedBuffer<u8> data, bool reliable)
1100 Peer *peer = GetPeer(peer_id);
1101 Channel *channel = &(peer->channels[channelnum]);
1105 u16 seqnum = channel->next_outgoing_seqnum;
1106 channel->next_outgoing_seqnum++;
1108 SharedBuffer<u8> reliable = makeReliablePacket(data, seqnum);
1110 // Add base headers and make a packet
1111 BufferedPacket p = makePacket(peer->address, reliable,
1112 m_protocol_id, m_peer_id, channelnum);
1115 // Buffer the packet
1116 channel->outgoing_reliables.insert(p);
1118 catch(AlreadyExistsException &e)
1120 PrintInfo(derr_con);
1121 derr_con<<"WARNING: Going to send a reliable packet "
1122 "seqnum="<<seqnum<<" that is already "
1123 "in outgoing buffer"<<std::endl;
1132 // Add base headers and make a packet
1133 BufferedPacket p = makePacket(peer->address, data,
1134 m_protocol_id, m_peer_id, channelnum);
1141 void Connection::RawSend(const BufferedPacket &packet)
1143 m_socket.Send(packet.address, *packet.data, packet.data.getSize());
1146 void Connection::RunTimeouts(float dtime)
1148 core::list<u16> timeouted_peers;
1149 core::map<u16, Peer*>::Iterator j;
1150 j = m_peers.getIterator();
1151 for(; j.atEnd() == false; j++)
1153 Peer *peer = j.getNode()->getValue();
1158 peer->timeout_counter += dtime;
1159 if(peer->timeout_counter > m_timeout)
1161 PrintInfo(derr_con);
1162 derr_con<<"RunTimeouts(): Peer "<<peer->id
1164 <<" (source=peer->timeout_counter)"
1166 // Add peer to the list
1167 timeouted_peers.push_back(peer->id);
1168 // Don't bother going through the buffers of this one
1172 float resend_timeout = peer->resend_timeout;
1173 for(u16 i=0; i<CHANNEL_COUNT; i++)
1175 core::list<BufferedPacket> timed_outs;
1176 core::list<BufferedPacket>::Iterator j;
1178 Channel *channel = &peer->channels[i];
1180 // Remove timed out incomplete unreliable split packets
1181 channel->incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout);
1183 // Increment reliable packet times
1184 channel->outgoing_reliables.incrementTimeouts(dtime);
1186 // Check reliable packet total times, remove peer if
1188 if(channel->outgoing_reliables.anyTotaltimeReached(m_timeout))
1190 PrintInfo(derr_con);
1191 derr_con<<"RunTimeouts(): Peer "<<peer->id
1193 <<" (source=reliable packet totaltime)"
1195 // Add peer to the to-be-removed list
1196 timeouted_peers.push_back(peer->id);
1200 // Re-send timed out outgoing reliables
1202 timed_outs = channel->
1203 outgoing_reliables.getTimedOuts(resend_timeout);
1205 channel->outgoing_reliables.resetTimedOuts(resend_timeout);
1207 j = timed_outs.begin();
1208 for(; j != timed_outs.end(); j++)
1210 u16 peer_id = readPeerId(*(j->data));
1211 u8 channel = readChannel(*(j->data));
1212 u16 seqnum = readU16(&(j->data[BASE_HEADER_SIZE+1]));
1214 PrintInfo(derr_con);
1215 derr_con<<"RE-SENDING timed-out RELIABLE to ";
1216 j->address.print(&derr_con);
1217 derr_con<<"(t/o="<<resend_timeout<<"): "
1218 <<"from_peer_id="<<peer_id
1219 <<", channel="<<((int)channel&0xff)
1220 <<", seqnum="<<seqnum
1225 // Enlarge avg_rtt and resend_timeout:
1226 // The rtt will be at least the timeout.
1227 // NOTE: This won't affect the timeout of the next
1228 // checked channel because it was cached.
1229 peer->reportRTT(resend_timeout);
1236 peer->ping_timer += dtime;
1237 if(peer->ping_timer >= 5.0)
1239 // Create and send PING packet
1240 SharedBuffer<u8> data(2);
1241 writeU8(&data[0], TYPE_CONTROL);
1242 writeU8(&data[1], CONTROLTYPE_PING);
1243 SendAsPacket(peer->id, 0, data, true);
1245 peer->ping_timer = 0.0;
1252 // Remove timeouted peers
1253 core::list<u16>::Iterator i = timeouted_peers.begin();
1254 for(; i != timeouted_peers.end(); i++)
1256 PrintInfo(derr_con);
1257 derr_con<<"RunTimeouts(): Removing peer "<<(*i)<<std::endl;
1258 m_peerhandler->deletingPeer(m_peers[*i], true);
1264 Peer* Connection::GetPeer(u16 peer_id)
1266 core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
1270 throw PeerNotFoundException("Peer not found (possible timeout)");
1274 assert(node->getValue()->id == peer_id);
1276 return node->getValue();
1279 Peer* Connection::GetPeerNoEx(u16 peer_id)
1281 core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
1288 assert(node->getValue()->id == peer_id);
1290 return node->getValue();
1293 core::list<Peer*> Connection::GetPeers()
1295 core::list<Peer*> list;
1296 core::map<u16, Peer*>::Iterator j;
1297 j = m_peers.getIterator();
1298 for(; j.atEnd() == false; j++)
1300 Peer *peer = j.getNode()->getValue();
1301 list.push_back(peer);
1306 void Connection::PrintInfo(std::ostream &out)
1308 out<<m_socket.GetHandle();
1310 out<<"con "<<m_peer_id<<": ";
1311 for(s16 i=0; i<(s16)m_indentation-1; i++)
1315 void Connection::PrintInfo()
1317 PrintInfo(dout_con);