return timed_outs;
}
+/*
+ IncomingSplitPacket
+*/
+
+bool IncomingSplitPacket::insert(u32 chunk_num, SharedBuffer<u8> &chunkdata)
+{
+ sanity_check(chunk_num < chunk_count);
+
+ // If chunk already exists, ignore it.
+ // Sometimes two identical packets may arrive when there is network
+ // lag and the server re-sends stuff.
+ if (chunks.find(chunk_num) != chunks.end())
+ return false;
+
+ // Set chunk data in buffer
+ chunks[chunk_num] = chunkdata;
+
+ return true;
+}
+
+SharedBuffer<u8> IncomingSplitPacket::reassemble()
+{
+ sanity_check(allReceived());
+
+ // Calculate total size
+ u32 totalsize = 0;
+ for (const auto &chunk : chunks)
+ totalsize += chunk.second.getSize();
+
+ SharedBuffer<u8> fulldata(totalsize);
+
+ // Copy chunks to data buffer
+ u32 start = 0;
+ for (u32 chunk_i = 0; chunk_i < chunk_count; chunk_i++) {
+ const SharedBuffer<u8> &buf = chunks[chunk_i];
+ memcpy(&fulldata[start], *buf, buf.getSize());
+ start += buf.getSize();
+ }
+
+ return fulldata;
+}
+
/*
IncomingSplitBuffer
*/
delete i.second;
}
}
-/*
- This will throw a GotSplitPacketException when a full
- split packet is constructed.
-*/
+
SharedBuffer<u8> IncomingSplitBuffer::insert(const BufferedPacket &p, bool reliable)
{
MutexAutoLock listlock(m_map_mutex);
}
// Add if doesn't exist
+ IncomingSplitPacket *sp;
if (m_buf.find(seqnum) == m_buf.end()) {
- m_buf[seqnum] = new IncomingSplitPacket(chunk_count, reliable);
+ sp = new IncomingSplitPacket(chunk_count, reliable);
+ m_buf[seqnum] = sp;
+ } else {
+ sp = m_buf[seqnum];
}
- IncomingSplitPacket *sp = m_buf[seqnum];
-
if (chunk_count != sp->chunk_count) {
errorstream << "IncomingSplitBuffer::insert(): chunk_count="
<< chunk_count << " != sp->chunk_count=" << sp->chunk_count
<<" != sp->reliable="<<sp->reliable
<<std::endl);
- // If chunk already exists, ignore it.
- // Sometimes two identical packets may arrive when there is network
- // lag and the server re-sends stuff.
- if (sp->chunks.find(chunk_num) != sp->chunks.end())
- return SharedBuffer<u8>();
-
// Cut chunk data out of packet
u32 chunkdatasize = p.data.getSize() - headersize;
SharedBuffer<u8> chunkdata(chunkdatasize);
memcpy(*chunkdata, &(p.data[headersize]), chunkdatasize);
- // Set chunk data in buffer
- sp->chunks[chunk_num] = chunkdata;
+ if (!sp->insert(chunk_num, chunkdata))
+ return SharedBuffer<u8>();
// If not all chunks are received, return empty buffer
if (!sp->allReceived())
return SharedBuffer<u8>();
- // Calculate total size
- u32 totalsize = 0;
- for (const auto &chunk : sp->chunks) {
- totalsize += chunk.second.getSize();
- }
-
- SharedBuffer<u8> fulldata(totalsize);
-
- // Copy chunks to data buffer
- u32 start = 0;
- for (u32 chunk_i=0; chunk_i<sp->chunk_count; chunk_i++) {
- const SharedBuffer<u8> &buf = sp->chunks[chunk_i];
- u16 buf_chunkdatasize = buf.getSize();
- memcpy(&fulldata[start], *buf, buf_chunkdatasize);
- start += buf_chunkdatasize;
- }
+ SharedBuffer<u8> fulldata = sp->reassemble();
// Remove sp from buffer
m_buf.erase(seqnum);
return fulldata;
}
+
void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout)
{
std::deque<u16> remove_queue;
IncomingSplitPacket() = delete;
- // Key is chunk number, value is data without headers
- std::map<u16, SharedBuffer<u8>> chunks;
- u32 chunk_count;
float time = 0.0f; // Seconds from adding
- bool reliable = false; // If true, isn't deleted on timeout
+ u32 chunk_count;
+ bool reliable; // If true, isn't deleted on timeout
bool allReceived() const
{
return (chunks.size() == chunk_count);
}
+ bool insert(u32 chunk_num, SharedBuffer<u8> &chunkdata);
+ SharedBuffer<u8> reassemble();
+
+private:
+ // Key is chunk number, value is data without headers
+ std::map<u16, SharedBuffer<u8>> chunks;
};
/*