Translated using Weblate (Chinese (Simplified))
[oweals/minetest.git] / src / network / connectionthreads.cpp
1 /*
2 Minetest
3 Copyright (C) 2013-2017 celeron55, Perttu Ahola <celeron55@gmail.com>
4 Copyright (C) 2017 celeron55, Loic Blot <loic.blot@unix-experience.fr>
5
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU Lesser General Public License as published by
8 the Free Software Foundation; either version 2.1 of the License, or
9 (at your option) any later version.
10
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 GNU Lesser General Public License for more details.
15
16 You should have received a copy of the GNU Lesser General Public License along
17 with this program; if not, write to the Free Software Foundation, Inc.,
18 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 */
20
21 #include "connectionthreads.h"
22 #include "log.h"
23 #include "profiler.h"
24 #include "settings.h"
25 #include "network/networkpacket.h"
26 #include "util/serialize.h"
27
28 namespace con
29 {
30
31 /******************************************************************************/
32 /* defines used for debugging and profiling                                   */
33 /******************************************************************************/
34 #ifdef NDEBUG
35 #define LOG(a) a
36 #define PROFILE(a)
37 #undef DEBUG_CONNECTION_KBPS
38 #else
39 /* this mutex is used to achieve log message consistency */
40 std::mutex log_conthread_mutex;
41 #define LOG(a)                                                                \
42         {                                                                         \
43         MutexAutoLock loglock(log_conthread_mutex);                                 \
44         a;                                                                        \
45         }
46 #define PROFILE(a) a
47 //#define DEBUG_CONNECTION_KBPS
48 #undef DEBUG_CONNECTION_KBPS
49 #endif
50
51 /* maximum number of retries for reliable packets */
52 #define MAX_RELIABLE_RETRY 5
53
54 #define WINDOW_SIZE 5
55
56 static session_t readPeerId(u8 *packetdata)
57 {
58         return readU16(&packetdata[4]);
59 }
60 static u8 readChannel(u8 *packetdata)
61 {
62         return readU8(&packetdata[6]);
63 }
64
65 /******************************************************************************/
66 /* Connection Threads                                                         */
67 /******************************************************************************/
68
69 ConnectionSendThread::ConnectionSendThread(unsigned int max_packet_size,
70         float timeout) :
71         Thread("ConnectionSend"),
72         m_max_packet_size(max_packet_size),
73         m_timeout(timeout),
74         m_max_data_packets_per_iteration(g_settings->getU16("max_packets_per_iteration"))
75 {
76         SANITY_CHECK(m_max_data_packets_per_iteration > 1);
77 }
78
79 void *ConnectionSendThread::run()
80 {
81         assert(m_connection);
82
83         LOG(dout_con << m_connection->getDesc()
84                 << "ConnectionSend thread started" << std::endl);
85
86         u64 curtime = porting::getTimeMs();
87         u64 lasttime = curtime;
88
89         PROFILE(std::stringstream ThreadIdentifier);
90         PROFILE(ThreadIdentifier << "ConnectionSend: [" << m_connection->getDesc() << "]");
91
92         /* if stop is requested don't stop immediately but try to send all        */
93         /* packets first */
94         while (!stopRequested() || packetsQueued()) {
95                 BEGIN_DEBUG_EXCEPTION_HANDLER
96                 PROFILE(ScopeProfiler sp(g_profiler, ThreadIdentifier.str(), SPT_AVG));
97
98                 m_iteration_packets_avaialble = m_max_data_packets_per_iteration;
99
100                 /* wait for trigger or timeout */
101                 m_send_sleep_semaphore.wait(50);
102
103                 /* remove all triggers */
104                 while (m_send_sleep_semaphore.wait(0)) {
105                 }
106
107                 lasttime = curtime;
108                 curtime = porting::getTimeMs();
109                 float dtime = CALC_DTIME(lasttime, curtime);
110
111                 /* first resend timed-out packets */
112                 runTimeouts(dtime);
113                 if (m_iteration_packets_avaialble == 0) {
114                         LOG(warningstream << m_connection->getDesc()
115                                 << " Packet quota used up after re-sending packets, "
116                                 << "max=" << m_max_data_packets_per_iteration << std::endl);
117                 }
118
119                 /* translate commands to packets */
120                 ConnectionCommand c = m_connection->m_command_queue.pop_frontNoEx(0);
121                 while (c.type != CONNCMD_NONE) {
122                         if (c.reliable)
123                                 processReliableCommand(c);
124                         else
125                                 processNonReliableCommand(c);
126
127                         c = m_connection->m_command_queue.pop_frontNoEx(0);
128                 }
129
130                 /* send queued packets */
131                 sendPackets(dtime);
132
133                 END_DEBUG_EXCEPTION_HANDLER
134         }
135
136         PROFILE(g_profiler->remove(ThreadIdentifier.str()));
137         return NULL;
138 }
139
140 void ConnectionSendThread::Trigger()
141 {
142         m_send_sleep_semaphore.post();
143 }
144
145 bool ConnectionSendThread::packetsQueued()
146 {
147         std::list<session_t> peerIds = m_connection->getPeerIDs();
148
149         if (!m_outgoing_queue.empty() && !peerIds.empty())
150                 return true;
151
152         for (session_t peerId : peerIds) {
153                 PeerHelper peer = m_connection->getPeerNoEx(peerId);
154
155                 if (!peer)
156                         continue;
157
158                 if (dynamic_cast<UDPPeer *>(&peer) == 0)
159                         continue;
160
161                 for (Channel &channel : (dynamic_cast<UDPPeer *>(&peer))->channels) {
162                         if (!channel.queued_commands.empty()) {
163                                 return true;
164                         }
165                 }
166         }
167
168
169         return false;
170 }
171
172 void ConnectionSendThread::runTimeouts(float dtime)
173 {
174         std::list<session_t> timeouted_peers;
175         std::list<session_t> peerIds = m_connection->getPeerIDs();
176
177         for (session_t &peerId : peerIds) {
178                 PeerHelper peer = m_connection->getPeerNoEx(peerId);
179
180                 if (!peer)
181                         continue;
182
183                 UDPPeer *udpPeer = dynamic_cast<UDPPeer *>(&peer);
184                 if (!udpPeer)
185                         continue;
186
187                 PROFILE(std::stringstream peerIdentifier);
188                 PROFILE(peerIdentifier << "runTimeouts[" << m_connection->getDesc()
189                         << ";" << peerId << ";RELIABLE]");
190                 PROFILE(ScopeProfiler
191                 peerprofiler(g_profiler, peerIdentifier.str(), SPT_AVG));
192
193                 SharedBuffer<u8> data(2); // data for sending ping, required here because of goto
194
195                 /*
196                         Check peer timeout
197                 */
198                 if (peer->isTimedOut(m_timeout)) {
199                         infostream << m_connection->getDesc()
200                                 << "RunTimeouts(): Peer " << peer->id
201                                 << " has timed out."
202                                 << std::endl;
203                         // Add peer to the list
204                         timeouted_peers.push_back(peer->id);
205                         // Don't bother going through the buffers of this one
206                         continue;
207                 }
208
209                 float resend_timeout = udpPeer->getResendTimeout();
210                 bool retry_count_exceeded = false;
211                 for (Channel &channel : udpPeer->channels) {
212                         std::list<BufferedPacket> timed_outs;
213
214                         // Remove timed out incomplete unreliable split packets
215                         channel.incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout);
216
217                         // Increment reliable packet times
218                         channel.outgoing_reliables_sent.incrementTimeouts(dtime);
219
220                         unsigned int numpeers = m_connection->m_peers.size();
221
222                         if (numpeers == 0)
223                                 return;
224
225                         // Re-send timed out outgoing reliables
226                         timed_outs = channel.outgoing_reliables_sent.getTimedOuts(resend_timeout,
227                                 (m_max_data_packets_per_iteration / numpeers));
228
229                         channel.UpdatePacketLossCounter(timed_outs.size());
230                         g_profiler->graphAdd("packets_lost", timed_outs.size());
231
232                         m_iteration_packets_avaialble -= timed_outs.size();
233
234                         for (std::list<BufferedPacket>::iterator k = timed_outs.begin();
235                                 k != timed_outs.end(); ++k) {
236                                 session_t peer_id = readPeerId(*(k->data));
237                                 u8 channelnum = readChannel(*(k->data));
238                                 u16 seqnum = readU16(&(k->data[BASE_HEADER_SIZE + 1]));
239
240                                 channel.UpdateBytesLost(k->data.getSize());
241                                 k->resend_count++;
242
243                                 if (k->resend_count > MAX_RELIABLE_RETRY) {
244                                         retry_count_exceeded = true;
245                                         timeouted_peers.push_back(peer->id);
246                                         /* no need to check additional packets if a single one did timeout*/
247                                         break;
248                                 }
249
250                                 LOG(derr_con << m_connection->getDesc()
251                                         << "RE-SENDING timed-out RELIABLE to "
252                                         << k->address.serializeString()
253                                         << "(t/o=" << resend_timeout << "): "
254                                         << "from_peer_id=" << peer_id
255                                         << ", channel=" << ((int) channelnum & 0xff)
256                                         << ", seqnum=" << seqnum
257                                         << std::endl);
258
259                                 rawSend(*k);
260
261                                 // do not handle rtt here as we can't decide if this packet was
262                                 // lost or really takes more time to transmit
263                         }
264
265                         if (retry_count_exceeded) {
266                                 break; /* no need to check other channels if we already did timeout */
267                         }
268
269                         channel.UpdateTimers(dtime);
270                 }
271
272                 /* skip to next peer if we did timeout */
273                 if (retry_count_exceeded)
274                         continue;
275
276                 /* send ping if necessary */
277                 if (udpPeer->Ping(dtime, data)) {
278                         LOG(dout_con << m_connection->getDesc()
279                                 << "Sending ping for peer_id: " << udpPeer->id << std::endl);
280                         /* this may fail if there ain't a sequence number left */
281                         if (!rawSendAsPacket(udpPeer->id, 0, data, true)) {
282                                 //retrigger with reduced ping interval
283                                 udpPeer->Ping(4.0, data);
284                         }
285                 }
286
287                 udpPeer->RunCommandQueues(m_max_packet_size,
288                         m_max_commands_per_iteration,
289                         m_max_packets_requeued);
290         }
291
292         // Remove timed out peers
293         for (u16 timeouted_peer : timeouted_peers) {
294                 LOG(dout_con << m_connection->getDesc()
295                         << "RunTimeouts(): Removing peer " << timeouted_peer << std::endl);
296                 m_connection->deletePeer(timeouted_peer, true);
297         }
298 }
299
300 void ConnectionSendThread::rawSend(const BufferedPacket &packet)
301 {
302         try {
303                 m_connection->m_udpSocket.Send(packet.address, *packet.data,
304                         packet.data.getSize());
305                 LOG(dout_con << m_connection->getDesc()
306                         << " rawSend: " << packet.data.getSize()
307                         << " bytes sent" << std::endl);
308         } catch (SendFailedException &e) {
309                 LOG(derr_con << m_connection->getDesc()
310                         << "Connection::rawSend(): SendFailedException: "
311                         << packet.address.serializeString() << std::endl);
312         }
313 }
314
315 void ConnectionSendThread::sendAsPacketReliable(BufferedPacket &p, Channel *channel)
316 {
317         try {
318                 p.absolute_send_time = porting::getTimeMs();
319                 // Buffer the packet
320                 channel->outgoing_reliables_sent.insert(p,
321                         (channel->readOutgoingSequenceNumber() - MAX_RELIABLE_WINDOW_SIZE)
322                                 % (MAX_RELIABLE_WINDOW_SIZE + 1));
323         }
324         catch (AlreadyExistsException &e) {
325                 LOG(derr_con << m_connection->getDesc()
326                         << "WARNING: Going to send a reliable packet"
327                         << " in outgoing buffer" << std::endl);
328         }
329
330         // Send the packet
331         rawSend(p);
332 }
333
334 bool ConnectionSendThread::rawSendAsPacket(session_t peer_id, u8 channelnum,
335         const SharedBuffer<u8> &data, bool reliable)
336 {
337         PeerHelper peer = m_connection->getPeerNoEx(peer_id);
338         if (!peer) {
339                 LOG(errorstream << m_connection->getDesc()
340                         << " dropped " << (reliable ? "reliable " : "")
341                         << "packet for non existent peer_id: " << peer_id << std::endl);
342                 return false;
343         }
344         Channel *channel = &(dynamic_cast<UDPPeer *>(&peer)->channels[channelnum]);
345
346         if (reliable) {
347                 bool have_sequence_number_for_raw_packet = true;
348                 u16 seqnum =
349                         channel->getOutgoingSequenceNumber(have_sequence_number_for_raw_packet);
350
351                 if (!have_sequence_number_for_raw_packet)
352                         return false;
353
354                 SharedBuffer<u8> reliable = makeReliablePacket(data, seqnum);
355                 Address peer_address;
356                 peer->getAddress(MTP_MINETEST_RELIABLE_UDP, peer_address);
357
358                 // Add base headers and make a packet
359                 BufferedPacket p = con::makePacket(peer_address, reliable,
360                         m_connection->GetProtocolID(), m_connection->GetPeerID(),
361                         channelnum);
362
363                 // first check if our send window is already maxed out
364                 if (channel->outgoing_reliables_sent.size()
365                         < channel->getWindowSize()) {
366                         LOG(dout_con << m_connection->getDesc()
367                                 << " INFO: sending a reliable packet to peer_id " << peer_id
368                                 << " channel: " << (u32)channelnum
369                                 << " seqnum: " << seqnum << std::endl);
370                         sendAsPacketReliable(p, channel);
371                         return true;
372                 }
373
374                 LOG(dout_con << m_connection->getDesc()
375                         << " INFO: queueing reliable packet for peer_id: " << peer_id
376                         << " channel: " << (u32)channelnum
377                         << " seqnum: " << seqnum << std::endl);
378                 channel->queued_reliables.push(p);
379                 return false;
380         }
381
382         Address peer_address;
383         if (peer->getAddress(MTP_UDP, peer_address)) {
384                 // Add base headers and make a packet
385                 BufferedPacket p = con::makePacket(peer_address, data,
386                         m_connection->GetProtocolID(), m_connection->GetPeerID(),
387                         channelnum);
388
389                 // Send the packet
390                 rawSend(p);
391                 return true;
392         }
393
394         LOG(dout_con << m_connection->getDesc()
395                 << " INFO: dropped unreliable packet for peer_id: " << peer_id
396                 << " because of (yet) missing udp address" << std::endl);
397         return false;
398 }
399
400 void ConnectionSendThread::processReliableCommand(ConnectionCommand &c)
401 {
402         assert(c.reliable);  // Pre-condition
403
404         switch (c.type) {
405                 case CONNCMD_NONE:
406                         LOG(dout_con << m_connection->getDesc()
407                                 << "UDP processing reliable CONNCMD_NONE" << std::endl);
408                         return;
409
410                 case CONNCMD_SEND:
411                         LOG(dout_con << m_connection->getDesc()
412                                 << "UDP processing reliable CONNCMD_SEND" << std::endl);
413                         sendReliable(c);
414                         return;
415
416                 case CONNCMD_SEND_TO_ALL:
417                         LOG(dout_con << m_connection->getDesc()
418                                 << "UDP processing CONNCMD_SEND_TO_ALL" << std::endl);
419                         sendToAllReliable(c);
420                         return;
421
422                 case CONCMD_CREATE_PEER:
423                         LOG(dout_con << m_connection->getDesc()
424                                 << "UDP processing reliable CONCMD_CREATE_PEER" << std::endl);
425                         if (!rawSendAsPacket(c.peer_id, c.channelnum, c.data, c.reliable)) {
426                                 /* put to queue if we couldn't send it immediately */
427                                 sendReliable(c);
428                         }
429                         return;
430
431                 case CONNCMD_SERVE:
432                 case CONNCMD_CONNECT:
433                 case CONNCMD_DISCONNECT:
434                 case CONCMD_ACK:
435                         FATAL_ERROR("Got command that shouldn't be reliable as reliable command");
436                 default:
437                         LOG(dout_con << m_connection->getDesc()
438                                 << " Invalid reliable command type: " << c.type << std::endl);
439         }
440 }
441
442
443 void ConnectionSendThread::processNonReliableCommand(ConnectionCommand &c)
444 {
445         assert(!c.reliable); // Pre-condition
446
447         switch (c.type) {
448                 case CONNCMD_NONE:
449                         LOG(dout_con << m_connection->getDesc()
450                                 << " UDP processing CONNCMD_NONE" << std::endl);
451                         return;
452                 case CONNCMD_SERVE:
453                         LOG(dout_con << m_connection->getDesc()
454                                 << " UDP processing CONNCMD_SERVE port="
455                                 << c.address.serializeString() << std::endl);
456                         serve(c.address);
457                         return;
458                 case CONNCMD_CONNECT:
459                         LOG(dout_con << m_connection->getDesc()
460                                 << " UDP processing CONNCMD_CONNECT" << std::endl);
461                         connect(c.address);
462                         return;
463                 case CONNCMD_DISCONNECT:
464                         LOG(dout_con << m_connection->getDesc()
465                                 << " UDP processing CONNCMD_DISCONNECT" << std::endl);
466                         disconnect();
467                         return;
468                 case CONNCMD_DISCONNECT_PEER:
469                         LOG(dout_con << m_connection->getDesc()
470                                 << " UDP processing CONNCMD_DISCONNECT_PEER" << std::endl);
471                         disconnect_peer(c.peer_id);
472                         return;
473                 case CONNCMD_SEND:
474                         LOG(dout_con << m_connection->getDesc()
475                                 << " UDP processing CONNCMD_SEND" << std::endl);
476                         send(c.peer_id, c.channelnum, c.data);
477                         return;
478                 case CONNCMD_SEND_TO_ALL:
479                         LOG(dout_con << m_connection->getDesc()
480                                 << " UDP processing CONNCMD_SEND_TO_ALL" << std::endl);
481                         sendToAll(c.channelnum, c.data);
482                         return;
483                 case CONCMD_ACK:
484                         LOG(dout_con << m_connection->getDesc()
485                                 << " UDP processing CONCMD_ACK" << std::endl);
486                         sendAsPacket(c.peer_id, c.channelnum, c.data, true);
487                         return;
488                 case CONCMD_CREATE_PEER:
489                         FATAL_ERROR("Got command that should be reliable as unreliable command");
490                 default:
491                         LOG(dout_con << m_connection->getDesc()
492                                 << " Invalid command type: " << c.type << std::endl);
493         }
494 }
495
496 void ConnectionSendThread::serve(Address bind_address)
497 {
498         LOG(dout_con << m_connection->getDesc()
499                 << "UDP serving at port " << bind_address.serializeString() << std::endl);
500         try {
501                 m_connection->m_udpSocket.Bind(bind_address);
502                 m_connection->SetPeerID(PEER_ID_SERVER);
503         }
504         catch (SocketException &e) {
505                 // Create event
506                 ConnectionEvent ce;
507                 ce.bindFailed();
508                 m_connection->putEvent(ce);
509         }
510 }
511
512 void ConnectionSendThread::connect(Address address)
513 {
514         LOG(dout_con << m_connection->getDesc() << " connecting to "
515                 << address.serializeString()
516                 << ":" << address.getPort() << std::endl);
517
518         UDPPeer *peer = m_connection->createServerPeer(address);
519
520         // Create event
521         ConnectionEvent e;
522         e.peerAdded(peer->id, peer->address);
523         m_connection->putEvent(e);
524
525         Address bind_addr;
526
527         if (address.isIPv6())
528                 bind_addr.setAddress((IPv6AddressBytes *) NULL);
529         else
530                 bind_addr.setAddress(0, 0, 0, 0);
531
532         m_connection->m_udpSocket.Bind(bind_addr);
533
534         // Send a dummy packet to server with peer_id = PEER_ID_INEXISTENT
535         m_connection->SetPeerID(PEER_ID_INEXISTENT);
536         NetworkPacket pkt(0, 0);
537         m_connection->Send(PEER_ID_SERVER, 0, &pkt, true);
538 }
539
540 void ConnectionSendThread::disconnect()
541 {
542         LOG(dout_con << m_connection->getDesc() << " disconnecting" << std::endl);
543
544         // Create and send DISCO packet
545         SharedBuffer<u8> data(2);
546         writeU8(&data[0], PACKET_TYPE_CONTROL);
547         writeU8(&data[1], CONTROLTYPE_DISCO);
548
549
550         // Send to all
551         std::list<session_t> peerids = m_connection->getPeerIDs();
552
553         for (session_t peerid : peerids) {
554                 sendAsPacket(peerid, 0, data, false);
555         }
556 }
557
558 void ConnectionSendThread::disconnect_peer(session_t peer_id)
559 {
560         LOG(dout_con << m_connection->getDesc() << " disconnecting peer" << std::endl);
561
562         // Create and send DISCO packet
563         SharedBuffer<u8> data(2);
564         writeU8(&data[0], PACKET_TYPE_CONTROL);
565         writeU8(&data[1], CONTROLTYPE_DISCO);
566         sendAsPacket(peer_id, 0, data, false);
567
568         PeerHelper peer = m_connection->getPeerNoEx(peer_id);
569
570         if (!peer)
571                 return;
572
573         if (dynamic_cast<UDPPeer *>(&peer) == 0) {
574                 return;
575         }
576
577         dynamic_cast<UDPPeer *>(&peer)->m_pending_disconnect = true;
578 }
579
580 void ConnectionSendThread::send(session_t peer_id, u8 channelnum,
581         const SharedBuffer<u8> &data)
582 {
583         assert(channelnum < CHANNEL_COUNT); // Pre-condition
584
585         PeerHelper peer = m_connection->getPeerNoEx(peer_id);
586         if (!peer) {
587                 LOG(dout_con << m_connection->getDesc() << " peer: peer_id=" << peer_id
588                         << ">>>NOT<<< found on sending packet"
589                         << ", channel " << (channelnum % 0xFF)
590                         << ", size: " << data.getSize() << std::endl);
591                 return;
592         }
593
594         LOG(dout_con << m_connection->getDesc() << " sending to peer_id=" << peer_id
595                 << ", channel " << (channelnum % 0xFF)
596                 << ", size: " << data.getSize() << std::endl);
597
598         u16 split_sequence_number = peer->getNextSplitSequenceNumber(channelnum);
599
600         u32 chunksize_max = m_max_packet_size - BASE_HEADER_SIZE;
601         std::list<SharedBuffer<u8>> originals;
602
603         makeAutoSplitPacket(data, chunksize_max, split_sequence_number, &originals);
604
605         peer->setNextSplitSequenceNumber(channelnum, split_sequence_number);
606
607         for (const SharedBuffer<u8> &original : originals) {
608                 sendAsPacket(peer_id, channelnum, original);
609         }
610 }
611
612 void ConnectionSendThread::sendReliable(ConnectionCommand &c)
613 {
614         PeerHelper peer = m_connection->getPeerNoEx(c.peer_id);
615         if (!peer)
616                 return;
617
618         peer->PutReliableSendCommand(c, m_max_packet_size);
619 }
620
621 void ConnectionSendThread::sendToAll(u8 channelnum, const SharedBuffer<u8> &data)
622 {
623         std::list<session_t> peerids = m_connection->getPeerIDs();
624
625         for (session_t peerid : peerids) {
626                 send(peerid, channelnum, data);
627         }
628 }
629
630 void ConnectionSendThread::sendToAllReliable(ConnectionCommand &c)
631 {
632         std::list<session_t> peerids = m_connection->getPeerIDs();
633
634         for (session_t peerid : peerids) {
635                 PeerHelper peer = m_connection->getPeerNoEx(peerid);
636
637                 if (!peer)
638                         continue;
639
640                 peer->PutReliableSendCommand(c, m_max_packet_size);
641         }
642 }
643
644 void ConnectionSendThread::sendPackets(float dtime)
645 {
646         std::list<session_t> peerIds = m_connection->getPeerIDs();
647         std::list<session_t> pendingDisconnect;
648         std::map<session_t, bool> pending_unreliable;
649
650         const unsigned int peer_packet_quota = m_iteration_packets_avaialble
651                 / MYMAX(peerIds.size(), 1);
652
653         for (session_t peerId : peerIds) {
654                 PeerHelper peer = m_connection->getPeerNoEx(peerId);
655                 //peer may have been removed
656                 if (!peer) {
657                         LOG(dout_con << m_connection->getDesc() << " Peer not found: peer_id="
658                                 << peerId
659                                 << std::endl);
660                         continue;
661                 }
662                 peer->m_increment_packets_remaining = peer_packet_quota;
663
664                 UDPPeer *udpPeer = dynamic_cast<UDPPeer *>(&peer);
665
666                 if (!udpPeer) {
667                         continue;
668                 }
669
670                 if (udpPeer->m_pending_disconnect) {
671                         pendingDisconnect.push_back(peerId);
672                 }
673
674                 PROFILE(std::stringstream
675                 peerIdentifier);
676                 PROFILE(
677                         peerIdentifier << "sendPackets[" << m_connection->getDesc() << ";" << peerId
678                                 << ";RELIABLE]");
679                 PROFILE(ScopeProfiler
680                 peerprofiler(g_profiler, peerIdentifier.str(), SPT_AVG));
681
682                 LOG(dout_con << m_connection->getDesc()
683                         << " Handle per peer queues: peer_id=" << peerId
684                         << " packet quota: " << peer->m_increment_packets_remaining << std::endl);
685
686                 // first send queued reliable packets for all peers (if possible)
687                 for (unsigned int i = 0; i < CHANNEL_COUNT; i++) {
688                         Channel &channel = udpPeer->channels[i];
689                         u16 next_to_ack = 0;
690
691                         channel.outgoing_reliables_sent.getFirstSeqnum(next_to_ack);
692                         u16 next_to_receive = 0;
693                         channel.incoming_reliables.getFirstSeqnum(next_to_receive);
694
695                         LOG(dout_con << m_connection->getDesc() << "\t channel: "
696                                 << i << ", peer quota:"
697                                 << peer->m_increment_packets_remaining
698                                 << std::endl
699                                 << "\t\t\treliables on wire: "
700                                 << channel.outgoing_reliables_sent.size()
701                                 << ", waiting for ack for " << next_to_ack
702                                 << std::endl
703                                 << "\t\t\tincoming_reliables: "
704                                 << channel.incoming_reliables.size()
705                                 << ", next reliable packet: "
706                                 << channel.readNextIncomingSeqNum()
707                                 << ", next queued: " << next_to_receive
708                                 << std::endl
709                                 << "\t\t\treliables queued : "
710                                 << channel.queued_reliables.size()
711                                 << std::endl
712                                 << "\t\t\tqueued commands  : "
713                                 << channel.queued_commands.size()
714                                 << std::endl);
715
716                         while (!channel.queued_reliables.empty() &&
717                                         channel.outgoing_reliables_sent.size()
718                                         < channel.getWindowSize() &&
719                                         peer->m_increment_packets_remaining > 0) {
720                                 BufferedPacket p = channel.queued_reliables.front();
721                                 channel.queued_reliables.pop();
722                                 LOG(dout_con << m_connection->getDesc()
723                                         << " INFO: sending a queued reliable packet "
724                                         << " channel: " << i
725                                         << ", seqnum: " << readU16(&p.data[BASE_HEADER_SIZE + 1])
726                                         << std::endl);
727                                 sendAsPacketReliable(p, &channel);
728                                 peer->m_increment_packets_remaining--;
729                         }
730                 }
731         }
732
733         if (!m_outgoing_queue.empty()) {
734                 LOG(dout_con << m_connection->getDesc()
735                         << " Handle non reliable queue ("
736                         << m_outgoing_queue.size() << " pkts)" << std::endl);
737         }
738
739         unsigned int initial_queuesize = m_outgoing_queue.size();
740         /* send non reliable packets*/
741         for (unsigned int i = 0; i < initial_queuesize; i++) {
742                 OutgoingPacket packet = m_outgoing_queue.front();
743                 m_outgoing_queue.pop();
744
745                 if (packet.reliable)
746                         continue;
747
748                 PeerHelper peer = m_connection->getPeerNoEx(packet.peer_id);
749                 if (!peer) {
750                         LOG(dout_con << m_connection->getDesc()
751                                 << " Outgoing queue: peer_id=" << packet.peer_id
752                                 << ">>>NOT<<< found on sending packet"
753                                 << ", channel " << (packet.channelnum % 0xFF)
754                                 << ", size: " << packet.data.getSize() << std::endl);
755                         continue;
756                 }
757
758                 /* send acks immediately */
759                 if (packet.ack || peer->m_increment_packets_remaining > 0 || stopRequested()) {
760                         rawSendAsPacket(packet.peer_id, packet.channelnum,
761                                 packet.data, packet.reliable);
762                         if (peer->m_increment_packets_remaining > 0)
763                                 peer->m_increment_packets_remaining--;
764                 } else {
765                         m_outgoing_queue.push(packet);
766                         pending_unreliable[packet.peer_id] = true;
767                 }
768         }
769
770         if (peer_packet_quota > 0) {
771                 for (session_t peerId : peerIds) {
772                         PeerHelper peer = m_connection->getPeerNoEx(peerId);
773                         if (!peer)
774                                 continue;
775                         if (peer->m_increment_packets_remaining == 0) {
776                                 LOG(warningstream << m_connection->getDesc()
777                                         << " Packet quota used up for peer_id=" << peerId
778                                         << ", was " << peer_packet_quota << " pkts" << std::endl);
779                         }
780                 }
781         }
782
783         for (session_t peerId : pendingDisconnect) {
784                 if (!pending_unreliable[peerId]) {
785                         m_connection->deletePeer(peerId, false);
786                 }
787         }
788 }
789
790 void ConnectionSendThread::sendAsPacket(session_t peer_id, u8 channelnum,
791         const SharedBuffer<u8> &data, bool ack)
792 {
793         OutgoingPacket packet(peer_id, channelnum, data, false, ack);
794         m_outgoing_queue.push(packet);
795 }
796
797 ConnectionReceiveThread::ConnectionReceiveThread(unsigned int max_packet_size) :
798         Thread("ConnectionReceive")
799 {
800 }
801
802 void *ConnectionReceiveThread::run()
803 {
804         assert(m_connection);
805
806         LOG(dout_con << m_connection->getDesc()
807                 << "ConnectionReceive thread started" << std::endl);
808
809         PROFILE(std::stringstream
810         ThreadIdentifier);
811         PROFILE(ThreadIdentifier << "ConnectionReceive: [" << m_connection->getDesc() << "]");
812
813         // use IPv6 minimum allowed MTU as receive buffer size as this is
814         // theoretical reliable upper boundary of a udp packet for all IPv6 enabled
815         // infrastructure
816         const unsigned int packet_maxsize = 1500;
817         SharedBuffer<u8> packetdata(packet_maxsize);
818
819         bool packet_queued = true;
820
821 #ifdef DEBUG_CONNECTION_KBPS
822         u64 curtime = porting::getTimeMs();
823         u64 lasttime = curtime;
824         float debug_print_timer = 0.0;
825 #endif
826
827         while (!stopRequested()) {
828                 BEGIN_DEBUG_EXCEPTION_HANDLER
829                 PROFILE(ScopeProfiler
830                 sp(g_profiler, ThreadIdentifier.str(), SPT_AVG));
831
832 #ifdef DEBUG_CONNECTION_KBPS
833                 lasttime = curtime;
834                 curtime = porting::getTimeMs();
835                 float dtime = CALC_DTIME(lasttime,curtime);
836 #endif
837
838                 /* receive packets */
839                 receive(packetdata, packet_queued);
840
841 #ifdef DEBUG_CONNECTION_KBPS
842                 debug_print_timer += dtime;
843                 if (debug_print_timer > 20.0) {
844                         debug_print_timer -= 20.0;
845
846                         std::list<session_t> peerids = m_connection->getPeerIDs();
847
848                         for (std::list<session_t>::iterator i = peerids.begin();
849                                         i != peerids.end();
850                                         i++)
851                         {
852                                 PeerHelper peer = m_connection->getPeerNoEx(*i);
853                                 if (!peer)
854                                         continue;
855
856                                 float peer_current = 0.0;
857                                 float peer_loss = 0.0;
858                                 float avg_rate = 0.0;
859                                 float avg_loss = 0.0;
860
861                                 for(u16 j=0; j<CHANNEL_COUNT; j++)
862                                 {
863                                         peer_current +=peer->channels[j].getCurrentDownloadRateKB();
864                                         peer_loss += peer->channels[j].getCurrentLossRateKB();
865                                         avg_rate += peer->channels[j].getAvgDownloadRateKB();
866                                         avg_loss += peer->channels[j].getAvgLossRateKB();
867                                 }
868
869                                 std::stringstream output;
870                                 output << std::fixed << std::setprecision(1);
871                                 output << "OUT to Peer " << *i << " RATES (good / loss) " << std::endl;
872                                 output << "\tcurrent (sum): " << peer_current << "kb/s "<< peer_loss << "kb/s" << std::endl;
873                                 output << "\taverage (sum): " << avg_rate << "kb/s "<< avg_loss << "kb/s" << std::endl;
874                                 output << std::setfill(' ');
875                                 for(u16 j=0; j<CHANNEL_COUNT; j++)
876                                 {
877                                         output << "\tcha " << j << ":"
878                                                 << " CUR: " << std::setw(6) << peer->channels[j].getCurrentDownloadRateKB() <<"kb/s"
879                                                 << " AVG: " << std::setw(6) << peer->channels[j].getAvgDownloadRateKB() <<"kb/s"
880                                                 << " MAX: " << std::setw(6) << peer->channels[j].getMaxDownloadRateKB() <<"kb/s"
881                                                 << " /"
882                                                 << " CUR: " << std::setw(6) << peer->channels[j].getCurrentLossRateKB() <<"kb/s"
883                                                 << " AVG: " << std::setw(6) << peer->channels[j].getAvgLossRateKB() <<"kb/s"
884                                                 << " MAX: " << std::setw(6) << peer->channels[j].getMaxLossRateKB() <<"kb/s"
885                                                 << " / WS: " << peer->channels[j].getWindowSize()
886                                                 << std::endl;
887                                 }
888
889                                 fprintf(stderr,"%s\n",output.str().c_str());
890                         }
891                 }
892 #endif
893                 END_DEBUG_EXCEPTION_HANDLER
894         }
895
896         PROFILE(g_profiler->remove(ThreadIdentifier.str()));
897         return NULL;
898 }
899
900 // Receive packets from the network and buffers and create ConnectionEvents
901 void ConnectionReceiveThread::receive(SharedBuffer<u8> &packetdata,
902                 bool &packet_queued)
903 {
904         try {
905                 // First, see if there any buffered packets we can process now
906                 if (packet_queued) {
907                         bool data_left = true;
908                         session_t peer_id;
909                         SharedBuffer<u8> resultdata;
910                         while (data_left) {
911                                 try {
912                                         data_left = getFromBuffers(peer_id, resultdata);
913                                         if (data_left) {
914                                                 ConnectionEvent e;
915                                                 e.dataReceived(peer_id, resultdata);
916                                                 m_connection->putEvent(e);
917                                         }
918                                 }
919                                 catch (ProcessedSilentlyException &e) {
920                                         /* try reading again */
921                                 }
922                         }
923                         packet_queued = false;
924                 }
925
926                 // Call Receive() to wait for incoming data
927                 Address sender;
928                 s32 received_size = m_connection->m_udpSocket.Receive(sender,
929                         *packetdata, packetdata.getSize());
930                 if (received_size < 0)
931                         return;
932
933                 if ((received_size < BASE_HEADER_SIZE) ||
934                         (readU32(&packetdata[0]) != m_connection->GetProtocolID())) {
935                         LOG(derr_con << m_connection->getDesc()
936                                 << "Receive(): Invalid incoming packet, "
937                                 << "size: " << received_size
938                                 << ", protocol: "
939                                 << ((received_size >= 4) ? readU32(&packetdata[0]) : -1)
940                                 << std::endl);
941                         return;
942                 }
943
944                 session_t peer_id = readPeerId(*packetdata);
945                 u8 channelnum = readChannel(*packetdata);
946
947                 if (channelnum > CHANNEL_COUNT - 1) {
948                         LOG(derr_con << m_connection->getDesc()
949                                 << "Receive(): Invalid channel " << (u32)channelnum << std::endl);
950                         return;
951                 }
952
953                 /* Try to identify peer by sender address (may happen on join) */
954                 if (peer_id == PEER_ID_INEXISTENT) {
955                         peer_id = m_connection->lookupPeer(sender);
956                         // We do not have to remind the peer of its
957                         // peer id as the CONTROLTYPE_SET_PEER_ID
958                         // command was sent reliably.
959                 }
960
961                 /* The peer was not found in our lists. Add it. */
962                 if (peer_id == PEER_ID_INEXISTENT) {
963                         peer_id = m_connection->createPeer(sender, MTP_MINETEST_RELIABLE_UDP, 0);
964                 }
965
966                 PeerHelper peer = m_connection->getPeerNoEx(peer_id);
967                 if (!peer) {
968                         LOG(dout_con << m_connection->getDesc()
969                                 << " got packet from unknown peer_id: "
970                                 << peer_id << " Ignoring." << std::endl);
971                         return;
972                 }
973
974                 // Validate peer address
975
976                 Address peer_address;
977                 if (peer->getAddress(MTP_UDP, peer_address)) {
978                         if (peer_address != sender) {
979                                 LOG(derr_con << m_connection->getDesc()
980                                         << " Peer " << peer_id << " sending from different address."
981                                         " Ignoring." << std::endl);
982                                 return;
983                         }
984                 } else {
985                         LOG(derr_con << m_connection->getDesc()
986                                 << " Peer " << peer_id << " doesn't have an address?!"
987                                 " Ignoring." << std::endl);
988                         return;
989                 }
990
991                 peer->ResetTimeout();
992
993                 Channel *channel = nullptr;
994                 if (dynamic_cast<UDPPeer *>(&peer)) {
995                         channel = &dynamic_cast<UDPPeer *>(&peer)->channels[channelnum];
996                 } else {
997                         LOG(derr_con << m_connection->getDesc()
998                                 << "Receive(): peer_id=" << peer_id << " isn't an UDPPeer?!"
999                                 " Ignoring." << std::endl);
1000                         return;
1001                 }
1002
1003                 channel->UpdateBytesReceived(received_size);
1004
1005                 // Throw the received packet to channel->processPacket()
1006
1007                 // Make a new SharedBuffer from the data without the base headers
1008                 SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE);
1009                 memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE],
1010                         strippeddata.getSize());
1011
1012                 try {
1013                         // Process it (the result is some data with no headers made by us)
1014                         SharedBuffer<u8> resultdata = processPacket
1015                                 (channel, strippeddata, peer_id, channelnum, false);
1016
1017                         LOG(dout_con << m_connection->getDesc()
1018                                 << " ProcessPacket from peer_id: " << peer_id
1019                                 << ", channel: " << (u32)channelnum << ", returned "
1020                                 << resultdata.getSize() << " bytes" << std::endl);
1021
1022                         ConnectionEvent e;
1023                         e.dataReceived(peer_id, resultdata);
1024                         m_connection->putEvent(e);
1025                 }
1026                 catch (ProcessedSilentlyException &e) {
1027                 }
1028                 catch (ProcessedQueued &e) {
1029                         // we set it to true anyway (see below)
1030                 }
1031
1032                 /* Every time we receive a packet it can happen that a previously
1033                  * buffered packet is now ready to process. */
1034                 packet_queued = true;
1035         }
1036         catch (InvalidIncomingDataException &e) {
1037         }
1038 }
1039
1040 bool ConnectionReceiveThread::getFromBuffers(session_t &peer_id, SharedBuffer<u8> &dst)
1041 {
1042         std::list<session_t> peerids = m_connection->getPeerIDs();
1043
1044         for (session_t peerid : peerids) {
1045                 PeerHelper peer = m_connection->getPeerNoEx(peerid);
1046                 if (!peer)
1047                         continue;
1048
1049                 if (dynamic_cast<UDPPeer *>(&peer) == 0)
1050                         continue;
1051
1052                 for (Channel &channel : (dynamic_cast<UDPPeer *>(&peer))->channels) {
1053                         if (checkIncomingBuffers(&channel, peer_id, dst)) {
1054                                 return true;
1055                         }
1056                 }
1057         }
1058         return false;
1059 }
1060
1061 bool ConnectionReceiveThread::checkIncomingBuffers(Channel *channel,
1062         session_t &peer_id, SharedBuffer<u8> &dst)
1063 {
1064         u16 firstseqnum = 0;
1065         if (channel->incoming_reliables.getFirstSeqnum(firstseqnum)) {
1066                 if (firstseqnum == channel->readNextIncomingSeqNum()) {
1067                         BufferedPacket p = channel->incoming_reliables.popFirst();
1068                         peer_id = readPeerId(*p.data);
1069                         u8 channelnum = readChannel(*p.data);
1070                         u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE + 1]);
1071
1072                         LOG(dout_con << m_connection->getDesc()
1073                                 << "UNBUFFERING TYPE_RELIABLE"
1074                                 << " seqnum=" << seqnum
1075                                 << " peer_id=" << peer_id
1076                                 << " channel=" << ((int) channelnum & 0xff)
1077                                 << std::endl);
1078
1079                         channel->incNextIncomingSeqNum();
1080
1081                         u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
1082                         // Get out the inside packet and re-process it
1083                         SharedBuffer<u8> payload(p.data.getSize() - headers_size);
1084                         memcpy(*payload, &p.data[headers_size], payload.getSize());
1085
1086                         dst = processPacket(channel, payload, peer_id, channelnum, true);
1087                         return true;
1088                 }
1089         }
1090         return false;
1091 }
1092
1093 SharedBuffer<u8> ConnectionReceiveThread::processPacket(Channel *channel,
1094         const SharedBuffer<u8> &packetdata, session_t peer_id, u8 channelnum, bool reliable)
1095 {
1096         PeerHelper peer = m_connection->getPeerNoEx(peer_id);
1097
1098         if (!peer) {
1099                 errorstream << "Peer not found (possible timeout)" << std::endl;
1100                 throw ProcessedSilentlyException("Peer not found (possible timeout)");
1101         }
1102
1103         if (packetdata.getSize() < 1)
1104                 throw InvalidIncomingDataException("packetdata.getSize() < 1");
1105
1106         u8 type = readU8(&(packetdata[0]));
1107
1108         if (MAX_UDP_PEERS <= 65535 && peer_id >= MAX_UDP_PEERS) {
1109                 std::string errmsg = "Invalid peer_id=" + itos(peer_id);
1110                 errorstream << errmsg << std::endl;
1111                 throw InvalidIncomingDataException(errmsg.c_str());
1112         }
1113
1114         if (type >= PACKET_TYPE_MAX) {
1115                 derr_con << m_connection->getDesc() << "Got invalid type=" << ((int) type & 0xff)
1116                         << std::endl;
1117                 throw InvalidIncomingDataException("Invalid packet type");
1118         }
1119
1120         const PacketTypeHandler &pHandle = packetTypeRouter[type];
1121         return (this->*pHandle.handler)(channel, packetdata, &peer, channelnum, reliable);
1122 }
1123
1124 const ConnectionReceiveThread::PacketTypeHandler
1125         ConnectionReceiveThread::packetTypeRouter[PACKET_TYPE_MAX] = {
1126         {&ConnectionReceiveThread::handlePacketType_Control},
1127         {&ConnectionReceiveThread::handlePacketType_Original},
1128         {&ConnectionReceiveThread::handlePacketType_Split},
1129         {&ConnectionReceiveThread::handlePacketType_Reliable},
1130 };
1131
1132 SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Control(Channel *channel,
1133         const SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, bool reliable)
1134 {
1135         if (packetdata.getSize() < 2)
1136                 throw InvalidIncomingDataException("packetdata.getSize() < 2");
1137
1138         u8 controltype = readU8(&(packetdata[1]));
1139
1140         if (controltype == CONTROLTYPE_ACK) {
1141                 assert(channel != NULL);
1142
1143                 if (packetdata.getSize() < 4) {
1144                         throw InvalidIncomingDataException(
1145                                 "packetdata.getSize() < 4 (ACK header size)");
1146                 }
1147
1148                 u16 seqnum = readU16(&packetdata[2]);
1149                 LOG(dout_con << m_connection->getDesc() << " [ CONTROLTYPE_ACK: channelnum="
1150                         << ((int) channelnum & 0xff) << ", peer_id=" << peer->id << ", seqnum="
1151                         << seqnum << " ]" << std::endl);
1152
1153                 try {
1154                         BufferedPacket p = channel->outgoing_reliables_sent.popSeqnum(seqnum);
1155
1156                         // only calculate rtt from straight sent packets
1157                         if (p.resend_count == 0) {
1158                                 // Get round trip time
1159                                 u64 current_time = porting::getTimeMs();
1160
1161                                 // a overflow is quite unlikely but as it'd result in major
1162                                 // rtt miscalculation we handle it here
1163                                 if (current_time > p.absolute_send_time) {
1164                                         float rtt = (current_time - p.absolute_send_time) / 1000.0;
1165
1166                                         // Let peer calculate stuff according to it
1167                                         // (avg_rtt and resend_timeout)
1168                                         dynamic_cast<UDPPeer *>(peer)->reportRTT(rtt);
1169                                 } else if (p.totaltime > 0) {
1170                                         float rtt = p.totaltime;
1171
1172                                         // Let peer calculate stuff according to it
1173                                         // (avg_rtt and resend_timeout)
1174                                         dynamic_cast<UDPPeer *>(peer)->reportRTT(rtt);
1175                                 }
1176                         }
1177                         // put bytes for max bandwidth calculation
1178                         channel->UpdateBytesSent(p.data.getSize(), 1);
1179                         if (channel->outgoing_reliables_sent.size() == 0)
1180                                 m_connection->TriggerSend();
1181                 } catch (NotFoundException &e) {
1182                         LOG(derr_con << m_connection->getDesc()
1183                                 << "WARNING: ACKed packet not in outgoing queue"
1184                                 << " seqnum=" << seqnum << std::endl);
1185                         channel->UpdatePacketTooLateCounter();
1186                 }
1187
1188                 throw ProcessedSilentlyException("Got an ACK");
1189         } else if (controltype == CONTROLTYPE_SET_PEER_ID) {
1190                 // Got a packet to set our peer id
1191                 if (packetdata.getSize() < 4)
1192                         throw InvalidIncomingDataException
1193                                 ("packetdata.getSize() < 4 (SET_PEER_ID header size)");
1194                 session_t peer_id_new = readU16(&packetdata[2]);
1195                 LOG(dout_con << m_connection->getDesc() << "Got new peer id: " << peer_id_new
1196                         << "... " << std::endl);
1197
1198                 if (m_connection->GetPeerID() != PEER_ID_INEXISTENT) {
1199                         LOG(derr_con << m_connection->getDesc()
1200                                 << "WARNING: Not changing existing peer id." << std::endl);
1201                 } else {
1202                         LOG(dout_con << m_connection->getDesc() << "changing own peer id"
1203                                 << std::endl);
1204                         m_connection->SetPeerID(peer_id_new);
1205                 }
1206
1207                 throw ProcessedSilentlyException("Got a SET_PEER_ID");
1208         } else if (controltype == CONTROLTYPE_PING) {
1209                 // Just ignore it, the incoming data already reset
1210                 // the timeout counter
1211                 LOG(dout_con << m_connection->getDesc() << "PING" << std::endl);
1212                 throw ProcessedSilentlyException("Got a PING");
1213         } else if (controltype == CONTROLTYPE_DISCO) {
1214                 // Just ignore it, the incoming data already reset
1215                 // the timeout counter
1216                 LOG(dout_con << m_connection->getDesc() << "DISCO: Removing peer "
1217                         << peer->id << std::endl);
1218
1219                 if (!m_connection->deletePeer(peer->id, false)) {
1220                         derr_con << m_connection->getDesc() << "DISCO: Peer not found" << std::endl;
1221                 }
1222
1223                 throw ProcessedSilentlyException("Got a DISCO");
1224         } else {
1225                 LOG(derr_con << m_connection->getDesc()
1226                         << "INVALID TYPE_CONTROL: invalid controltype="
1227                         << ((int) controltype & 0xff) << std::endl);
1228                 throw InvalidIncomingDataException("Invalid control type");
1229         }
1230 }
1231
1232 SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Original(Channel *channel,
1233         const SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, bool reliable)
1234 {
1235         if (packetdata.getSize() <= ORIGINAL_HEADER_SIZE)
1236                 throw InvalidIncomingDataException
1237                         ("packetdata.getSize() <= ORIGINAL_HEADER_SIZE");
1238         LOG(dout_con << m_connection->getDesc() << "RETURNING TYPE_ORIGINAL to user"
1239                 << std::endl);
1240         // Get the inside packet out and return it
1241         SharedBuffer<u8> payload(packetdata.getSize() - ORIGINAL_HEADER_SIZE);
1242         memcpy(*payload, &(packetdata[ORIGINAL_HEADER_SIZE]), payload.getSize());
1243         return payload;
1244 }
1245
1246 SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Split(Channel *channel,
1247         const SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, bool reliable)
1248 {
1249         Address peer_address;
1250
1251         if (peer->getAddress(MTP_UDP, peer_address)) {
1252                 // We have to create a packet again for buffering
1253                 // This isn't actually too bad an idea.
1254                 BufferedPacket packet = makePacket(peer_address,
1255                         packetdata,
1256                         m_connection->GetProtocolID(),
1257                         peer->id,
1258                         channelnum);
1259
1260                 // Buffer the packet
1261                 SharedBuffer<u8> data = peer->addSplitPacket(channelnum, packet, reliable);
1262
1263                 if (data.getSize() != 0) {
1264                         LOG(dout_con << m_connection->getDesc()
1265                                 << "RETURNING TYPE_SPLIT: Constructed full data, "
1266                                 << "size=" << data.getSize() << std::endl);
1267                         return data;
1268                 }
1269                 LOG(dout_con << m_connection->getDesc() << "BUFFERED TYPE_SPLIT" << std::endl);
1270                 throw ProcessedSilentlyException("Buffered a split packet chunk");
1271         }
1272
1273         // We should never get here.
1274         FATAL_ERROR("Invalid execution point");
1275 }
1276
1277 SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Reliable(Channel *channel,
1278         const SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, bool reliable)
1279 {
1280         assert(channel != NULL);
1281
1282         // Recursive reliable packets not allowed
1283         if (reliable)
1284                 throw InvalidIncomingDataException("Found nested reliable packets");
1285
1286         if (packetdata.getSize() < RELIABLE_HEADER_SIZE)
1287                 throw InvalidIncomingDataException("packetdata.getSize() < RELIABLE_HEADER_SIZE");
1288
1289         u16 seqnum = readU16(&packetdata[1]);
1290         bool is_future_packet = false;
1291         bool is_old_packet = false;
1292
1293         /* packet is within our receive window send ack */
1294         if (seqnum_in_window(seqnum,
1295                 channel->readNextIncomingSeqNum(), MAX_RELIABLE_WINDOW_SIZE)) {
1296                 m_connection->sendAck(peer->id, channelnum, seqnum);
1297         } else {
1298                 is_future_packet = seqnum_higher(seqnum, channel->readNextIncomingSeqNum());
1299                 is_old_packet = seqnum_higher(channel->readNextIncomingSeqNum(), seqnum);
1300
1301                 /* packet is not within receive window, don't send ack.           *
1302                  * if this was a valid packet it's gonna be retransmitted         */
1303                 if (is_future_packet)
1304                         throw ProcessedSilentlyException(
1305                                 "Received packet newer then expected, not sending ack");
1306
1307                 /* seems like our ack was lost, send another one for a old packet */
1308                 if (is_old_packet) {
1309                         LOG(dout_con << m_connection->getDesc()
1310                                 << "RE-SENDING ACK: peer_id: " << peer->id
1311                                 << ", channel: " << (channelnum & 0xFF)
1312                                 << ", seqnum: " << seqnum << std::endl;)
1313                         m_connection->sendAck(peer->id, channelnum, seqnum);
1314
1315                         // we already have this packet so this one was on wire at least
1316                         // the current timeout
1317                         // we don't know how long this packet was on wire don't do silly guessing
1318                         // dynamic_cast<UDPPeer*>(&peer)->
1319                         //     reportRTT(dynamic_cast<UDPPeer*>(&peer)->getResendTimeout());
1320
1321                         throw ProcessedSilentlyException("Retransmitting ack for old packet");
1322                 }
1323         }
1324
1325         if (seqnum != channel->readNextIncomingSeqNum()) {
1326                 Address peer_address;
1327
1328                 // this is a reliable packet so we have a udp address for sure
1329                 peer->getAddress(MTP_MINETEST_RELIABLE_UDP, peer_address);
1330                 // This one comes later, buffer it.
1331                 // Actually we have to make a packet to buffer one.
1332                 // Well, we have all the ingredients, so just do it.
1333                 BufferedPacket packet = con::makePacket(
1334                         peer_address,
1335                         packetdata,
1336                         m_connection->GetProtocolID(),
1337                         peer->id,
1338                         channelnum);
1339                 try {
1340                         channel->incoming_reliables.insert(packet, channel->readNextIncomingSeqNum());
1341
1342                         LOG(dout_con << m_connection->getDesc()
1343                                 << "BUFFERING, TYPE_RELIABLE peer_id: " << peer->id
1344                                 << ", channel: " << (channelnum & 0xFF)
1345                                 << ", seqnum: " << seqnum << std::endl;)
1346
1347                         throw ProcessedQueued("Buffered future reliable packet");
1348                 } catch (AlreadyExistsException &e) {
1349                 } catch (IncomingDataCorruption &e) {
1350                         ConnectionCommand discon;
1351                         discon.disconnect_peer(peer->id);
1352                         m_connection->putCommand(discon);
1353
1354                         LOG(derr_con << m_connection->getDesc()
1355                                 << "INVALID, TYPE_RELIABLE peer_id: " << peer->id
1356                                 << ", channel: " << (channelnum & 0xFF)
1357                                 << ", seqnum: " << seqnum
1358                                 << "DROPPING CLIENT!" << std::endl;)
1359                 }
1360         }
1361
1362         /* we got a packet to process right now */
1363         LOG(dout_con << m_connection->getDesc()
1364                 << "RECURSIVE, TYPE_RELIABLE peer_id: " << peer->id
1365                 << ", channel: " << (channelnum & 0xFF)
1366                 << ", seqnum: " << seqnum << std::endl;)
1367
1368
1369         /* check for resend case */
1370         u16 queued_seqnum = 0;
1371         if (channel->incoming_reliables.getFirstSeqnum(queued_seqnum)) {
1372                 if (queued_seqnum == seqnum) {
1373                         BufferedPacket queued_packet = channel->incoming_reliables.popFirst();
1374                         /** TODO find a way to verify the new against the old packet */
1375                 }
1376         }
1377
1378         channel->incNextIncomingSeqNum();
1379
1380         // Get out the inside packet and re-process it
1381         SharedBuffer<u8> payload(packetdata.getSize() - RELIABLE_HEADER_SIZE);
1382         memcpy(*payload, &packetdata[RELIABLE_HEADER_SIZE], payload.getSize());
1383
1384         return processPacket(channel, payload, peer->id, channelnum, true);
1385 }
1386
1387 }