From 964dbbb54ac163700438589a324c737f0e813b6e Mon Sep 17 00:00:00 2001 From: Dale Glass Date: Tue, 27 Aug 2024 22:57:47 +0200 Subject: [PATCH 1/2] Add locking around sockets, since QUdpSocket isn't thread safe enough. This fixes #1119 --- libraries/networking/src/udt/Socket.cpp | 55 +++++++++++++++++++------ libraries/networking/src/udt/Socket.h | 5 +++ 2 files changed, 48 insertions(+), 12 deletions(-) diff --git a/libraries/networking/src/udt/Socket.cpp b/libraries/networking/src/udt/Socket.cpp index eeed2778755..1ed7f22d72d 100644 --- a/libraries/networking/src/udt/Socket.cpp +++ b/libraries/networking/src/udt/Socket.cpp @@ -59,7 +59,10 @@ Socket::Socket(QObject* parent, bool shouldChangeSocketOptions) : } void Socket::bind(SocketType socketType, const QHostAddress& address, quint16 port) { - _networkSocket.bind(socketType, address, port); + { + Guard socketLock(_socketMutex); + _networkSocket.bind(socketType, address, port); + } if (_shouldChangeSocketOptions) { setSystemBufferSizes(socketType); @@ -68,36 +71,48 @@ void Socket::bind(SocketType socketType, const QHostAddress& address, quint16 po } #if defined(Q_OS_LINUX) - auto sd = _networkSocket.socketDescriptor(socketType); - int val = IP_PMTUDISC_DONT; - setsockopt(sd, IPPROTO_IP, IP_MTU_DISCOVER, &val, sizeof(val)); + { + Guard socketLock(_socketMutex); + auto sd = _networkSocket.socketDescriptor(socketType); + int val = IP_PMTUDISC_DONT; + setsockopt(sd, IPPROTO_IP, IP_MTU_DISCOVER, &val, sizeof(val)); + } #elif defined(Q_OS_WIN) - auto sd = _networkSocket.socketDescriptor(socketType); - int val = 0; // false - if (setsockopt(sd, IPPROTO_IP, IP_DONTFRAGMENT, (const char *)&val, sizeof(val))) { - auto wsaErr = WSAGetLastError(); - qCWarning(networking) << "Socket::bind Cannot setsockopt IP_DONTFRAGMENT" << wsaErr; + { + Guard socketLock(_socketMutex); + + auto sd = _networkSocket.socketDescriptor(socketType); + int val = 0; // false + if (setsockopt(sd, IPPROTO_IP, IP_DONTFRAGMENT, (const char *)&val, sizeof(val))) { + auto wsaErr = WSAGetLastError(); + qCWarning(networking) << "Socket::bind Cannot setsockopt IP_DONTFRAGMENT" << wsaErr; + } } #endif } } void Socket::rebind(SocketType socketType) { + Guard socketLock(_socketMutex); rebind(socketType, _networkSocket.localPort(socketType)); } void Socket::rebind(SocketType socketType, quint16 localPort) { + Guard socketLock(_socketMutex); _networkSocket.abort(socketType); bind(socketType, QHostAddress::AnyIPv4, localPort); } #if defined(WEBRTC_DATA_CHANNELS) const WebRTCSocket* Socket::getWebRTCSocket() { + Guard socketLock(_socketMutex); return _networkSocket.getWebRTCSocket(); } #endif void Socket::setSystemBufferSizes(SocketType socketType) { + Guard socketLock(_socketMutex); + for (int i = 0; i < 2; i++) { QAbstractSocket::SocketOption bufferOpt; QString bufferTypeString; @@ -245,6 +260,8 @@ qint64 Socket::writeDatagram(const char* data, qint64 size, const SockAddr& sock } qint64 Socket::writeDatagram(const QByteArray& datagram, const SockAddr& sockAddr) { + Guard socketLock(_socketMutex); + auto socketType = sockAddr.getType(); // don't attempt to write the datagram if we're unbound. Just drop it. @@ -355,6 +372,8 @@ void Socket::messageFailed(Connection* connection, Packet::MessageNumber message } void Socket::checkForReadyReadBackup() { + Guard socketLock(_socketMutex); + if (_networkSocket.hasPendingDatagrams()) { qCDebug(networking) << "Socket::checkForReadyReadBackup() detected blocked readyRead signal. Flushing pending datagrams."; @@ -377,14 +396,20 @@ void Socket::checkForReadyReadBackup() { } } +// This is here just so that we can do locking in the while below. +bool Socket::hasPending(int &packetSizeWithHeader) { + Guard socketLock(_socketMutex); + return _networkSocket.hasPendingDatagrams() && + (packetSizeWithHeader = _networkSocket.pendingDatagramSize()) != -1; +} + void Socket::readPendingDatagrams() { using namespace std::chrono; static const auto MAX_PROCESS_TIME { 100ms }; const auto abortTime = system_clock::now() + MAX_PROCESS_TIME; int packetSizeWithHeader = -1; - while (_networkSocket.hasPendingDatagrams() && - (packetSizeWithHeader = _networkSocket.pendingDatagramSize()) != -1) { + while (hasPending(packetSizeWithHeader)) { if (system_clock::now() > abortTime) { // We've been running for too long, stop processing packets for now // Once we've processed the event queue, we'll come back to packet processing @@ -409,7 +434,11 @@ void Socket::readPendingDatagrams() { auto buffer = std::unique_ptr(new char[packetSizeWithHeader]); // pull the datagram - auto sizeRead = _networkSocket.readDatagram(buffer.get(), packetSizeWithHeader, &senderSockAddr); + int sizeRead = 0; + { + Guard socketLock(_socketMutex); + sizeRead = _networkSocket.readDatagram(buffer.get(), packetSizeWithHeader, &senderSockAddr); + } // save information for this packet, in case it is the one that sticks readyRead _lastPacketSizeRead = sizeRead; @@ -552,6 +581,8 @@ std::vector Socket::getConnectionSockAddrs() { } void Socket::handleSocketError(SocketType socketType, QAbstractSocket::SocketError socketError) { + Guard socketLock(_socketMutex); + int wsaError = 0; static std::atomic previousWsaError(0); #ifdef WIN32 diff --git a/libraries/networking/src/udt/Socket.h b/libraries/networking/src/udt/Socket.h index ab9699bb8f4..6d625a64503 100644 --- a/libraries/networking/src/udt/Socket.h +++ b/libraries/networking/src/udt/Socket.h @@ -52,6 +52,8 @@ class Socket : public QObject { using Mutex = std::mutex; using Lock = std::unique_lock; + using Guard = std::lock_guard; + public: using StatsVector = std::vector>; @@ -123,6 +125,7 @@ private slots: std::vector getConnectionSockAddrs(); void connectToSendSignal(const SockAddr& destinationAddr, QObject* receiver, const char* slot); + bool hasPending(int &packetSizeWithHeader); Q_INVOKABLE void writeReliablePacket(Packet* packet, const SockAddr& sockAddr); Q_INVOKABLE void writeReliablePacketList(PacketList* packetList, const SockAddr& sockAddr); @@ -135,6 +138,8 @@ private slots: Mutex _unreliableSequenceNumbersMutex; Mutex _connectionsHashMutex; + Mutex _socketMutex; + std::unordered_map _unfilteredHandlers; std::unordered_map _unreliableSequenceNumbers; From 1804d668f874e9f967fb7cac24f7c59c1d15379e Mon Sep 17 00:00:00 2001 From: Dale Glass Date: Sun, 8 Sep 2024 00:03:53 +0200 Subject: [PATCH 2/2] Review fixes --- libraries/networking/src/udt/Socket.cpp | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/libraries/networking/src/udt/Socket.cpp b/libraries/networking/src/udt/Socket.cpp index 1ed7f22d72d..ff2e181f9bf 100644 --- a/libraries/networking/src/udt/Socket.cpp +++ b/libraries/networking/src/udt/Socket.cpp @@ -105,7 +105,6 @@ void Socket::rebind(SocketType socketType, quint16 localPort) { #if defined(WEBRTC_DATA_CHANNELS) const WebRTCSocket* Socket::getWebRTCSocket() { - Guard socketLock(_socketMutex); return _networkSocket.getWebRTCSocket(); } #endif @@ -121,13 +120,13 @@ void Socket::setSystemBufferSizes(SocketType socketType) { if (i == 0) { bufferOpt = QAbstractSocket::SendBufferSizeSocketOption; - numBytes = socketType == SocketType::UDP + numBytes = socketType == SocketType::UDP ? udt::UDP_SEND_BUFFER_SIZE_BYTES : udt::WEBRTC_SEND_BUFFER_SIZE_BYTES; bufferTypeString = "send"; } else { bufferOpt = QAbstractSocket::ReceiveBufferSizeSocketOption; - numBytes = socketType == SocketType::UDP + numBytes = socketType == SocketType::UDP ? udt::UDP_RECEIVE_BUFFER_SIZE_BYTES : udt::WEBRTC_RECEIVE_BUFFER_SIZE_BYTES; bufferTypeString = "receive"; }