Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add locking around sockets, since QUdpSocket isn't thread safe enough. #1122

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 44 additions & 14 deletions libraries/networking/src/udt/Socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@ Socket::Socket(QObject* parent, bool shouldChangeSocketOptions) :
}

void Socket::bind(SocketType socketType, const QHostAddress& address, quint16 port) {
_networkSocket.bind(socketType, address, port);
{
Guard socketLock(_socketMutex);
_networkSocket.bind(socketType, address, port);
}

if (_shouldChangeSocketOptions) {
setSystemBufferSizes(socketType);
Expand All @@ -68,25 +71,34 @@ void Socket::bind(SocketType socketType, const QHostAddress& address, quint16 po
}

#if defined(Q_OS_LINUX)
auto sd = _networkSocket.socketDescriptor(socketType);
int val = IP_PMTUDISC_DONT;
setsockopt(sd, IPPROTO_IP, IP_MTU_DISCOVER, &val, sizeof(val));
{
Guard socketLock(_socketMutex);
auto sd = _networkSocket.socketDescriptor(socketType);
int val = IP_PMTUDISC_DONT;
setsockopt(sd, IPPROTO_IP, IP_MTU_DISCOVER, &val, sizeof(val));
}
#elif defined(Q_OS_WIN)
auto sd = _networkSocket.socketDescriptor(socketType);
int val = 0; // false
if (setsockopt(sd, IPPROTO_IP, IP_DONTFRAGMENT, (const char *)&val, sizeof(val))) {
auto wsaErr = WSAGetLastError();
qCWarning(networking) << "Socket::bind Cannot setsockopt IP_DONTFRAGMENT" << wsaErr;
{
Guard socketLock(_socketMutex);

auto sd = _networkSocket.socketDescriptor(socketType);
int val = 0; // false
if (setsockopt(sd, IPPROTO_IP, IP_DONTFRAGMENT, (const char *)&val, sizeof(val))) {
auto wsaErr = WSAGetLastError();
qCWarning(networking) << "Socket::bind Cannot setsockopt IP_DONTFRAGMENT" << wsaErr;
}
}
#endif
}
}

void Socket::rebind(SocketType socketType) {
Guard socketLock(_socketMutex);
rebind(socketType, _networkSocket.localPort(socketType));
}

void Socket::rebind(SocketType socketType, quint16 localPort) {
Guard socketLock(_socketMutex);
_networkSocket.abort(socketType);
bind(socketType, QHostAddress::AnyIPv4, localPort);
}
Expand All @@ -98,6 +110,8 @@ const WebRTCSocket* Socket::getWebRTCSocket() {
#endif

void Socket::setSystemBufferSizes(SocketType socketType) {
Guard socketLock(_socketMutex);

for (int i = 0; i < 2; i++) {
QAbstractSocket::SocketOption bufferOpt;
QString bufferTypeString;
Expand All @@ -106,13 +120,13 @@ void Socket::setSystemBufferSizes(SocketType socketType) {

if (i == 0) {
bufferOpt = QAbstractSocket::SendBufferSizeSocketOption;
numBytes = socketType == SocketType::UDP
numBytes = socketType == SocketType::UDP
? udt::UDP_SEND_BUFFER_SIZE_BYTES : udt::WEBRTC_SEND_BUFFER_SIZE_BYTES;
bufferTypeString = "send";

} else {
bufferOpt = QAbstractSocket::ReceiveBufferSizeSocketOption;
numBytes = socketType == SocketType::UDP
numBytes = socketType == SocketType::UDP
? udt::UDP_RECEIVE_BUFFER_SIZE_BYTES : udt::WEBRTC_RECEIVE_BUFFER_SIZE_BYTES;
bufferTypeString = "receive";
}
Expand Down Expand Up @@ -245,6 +259,8 @@ qint64 Socket::writeDatagram(const char* data, qint64 size, const SockAddr& sock
}

qint64 Socket::writeDatagram(const QByteArray& datagram, const SockAddr& sockAddr) {
Guard socketLock(_socketMutex);

auto socketType = sockAddr.getType();

// don't attempt to write the datagram if we're unbound. Just drop it.
Expand Down Expand Up @@ -355,6 +371,8 @@ void Socket::messageFailed(Connection* connection, Packet::MessageNumber message
}

void Socket::checkForReadyReadBackup() {
Guard socketLock(_socketMutex);

if (_networkSocket.hasPendingDatagrams()) {
qCDebug(networking) << "Socket::checkForReadyReadBackup() detected blocked readyRead signal. Flushing pending datagrams.";

Expand All @@ -377,14 +395,20 @@ void Socket::checkForReadyReadBackup() {
}
}

// This is here just so that we can do locking in the while below.
bool Socket::hasPending(int &packetSizeWithHeader) {
Guard socketLock(_socketMutex);
return _networkSocket.hasPendingDatagrams() &&
(packetSizeWithHeader = _networkSocket.pendingDatagramSize()) != -1;
}

void Socket::readPendingDatagrams() {
using namespace std::chrono;
static const auto MAX_PROCESS_TIME { 100ms };
const auto abortTime = system_clock::now() + MAX_PROCESS_TIME;
int packetSizeWithHeader = -1;

while (_networkSocket.hasPendingDatagrams() &&
(packetSizeWithHeader = _networkSocket.pendingDatagramSize()) != -1) {
while (hasPending(packetSizeWithHeader)) {
if (system_clock::now() > abortTime) {
// We've been running for too long, stop processing packets for now
// Once we've processed the event queue, we'll come back to packet processing
Expand All @@ -409,7 +433,11 @@ void Socket::readPendingDatagrams() {
auto buffer = std::unique_ptr<char[]>(new char[packetSizeWithHeader]);

// pull the datagram
auto sizeRead = _networkSocket.readDatagram(buffer.get(), packetSizeWithHeader, &senderSockAddr);
int sizeRead = 0;
{
Guard socketLock(_socketMutex);
sizeRead = _networkSocket.readDatagram(buffer.get(), packetSizeWithHeader, &senderSockAddr);
}

// save information for this packet, in case it is the one that sticks readyRead
_lastPacketSizeRead = sizeRead;
Expand Down Expand Up @@ -552,6 +580,8 @@ std::vector<SockAddr> Socket::getConnectionSockAddrs() {
}

void Socket::handleSocketError(SocketType socketType, QAbstractSocket::SocketError socketError) {
Guard socketLock(_socketMutex);

int wsaError = 0;
static std::atomic<int> previousWsaError(0);
#ifdef WIN32
Expand Down
5 changes: 5 additions & 0 deletions libraries/networking/src/udt/Socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ class Socket : public QObject {

using Mutex = std::mutex;
using Lock = std::unique_lock<Mutex>;
using Guard = std::lock_guard<Mutex>;


public:
using StatsVector = std::vector<std::pair<SockAddr, ConnectionStats::Stats>>;
Expand Down Expand Up @@ -123,6 +125,7 @@ private slots:
std::vector<SockAddr> getConnectionSockAddrs();
void connectToSendSignal(const SockAddr& destinationAddr, QObject* receiver, const char* slot);

bool hasPending(int &packetSizeWithHeader);
Q_INVOKABLE void writeReliablePacket(Packet* packet, const SockAddr& sockAddr);
Q_INVOKABLE void writeReliablePacketList(PacketList* packetList, const SockAddr& sockAddr);

Expand All @@ -135,6 +138,8 @@ private slots:

Mutex _unreliableSequenceNumbersMutex;
Mutex _connectionsHashMutex;
Mutex _socketMutex;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since the problem is QUdpSocket (and WebRTCSocket?), should this locking logic be internal to NetworkSocket.h instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes more sense to keep it in udt::Socket.

NetworkSocket.h is an abstraction that could be used in a place where it's only ever used in a single thread and therefore not need locking. The problem happens because udt::Socket is called from multiple threads. And it also already deals with locking for other purposes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe, but it could just as easily be used in another multithreaded context and then we open ourselves up to more difficult to debug threading crashes 😄

at the very least, can we add something to NetworkSocket about this? maybe just a big comment, although if we could detect this and log a warning (just the first time it happens) that would also be great.



std::unordered_map<SockAddr, BasePacketHandler> _unfilteredHandlers;
std::unordered_map<SockAddr, SequenceNumber> _unreliableSequenceNumbers;
Expand Down
Loading