diff --git a/libraries/networking/src/udt/Socket.cpp b/libraries/networking/src/udt/Socket.cpp index eeed277875..ff2e181f9b 100644 --- a/libraries/networking/src/udt/Socket.cpp +++ b/libraries/networking/src/udt/Socket.cpp @@ -59,7 +59,10 @@ Socket::Socket(QObject* parent, bool shouldChangeSocketOptions) : } void Socket::bind(SocketType socketType, const QHostAddress& address, quint16 port) { - _networkSocket.bind(socketType, address, port); + { + Guard socketLock(_socketMutex); + _networkSocket.bind(socketType, address, port); + } if (_shouldChangeSocketOptions) { setSystemBufferSizes(socketType); @@ -68,25 +71,34 @@ void Socket::bind(SocketType socketType, const QHostAddress& address, quint16 po } #if defined(Q_OS_LINUX) - auto sd = _networkSocket.socketDescriptor(socketType); - int val = IP_PMTUDISC_DONT; - setsockopt(sd, IPPROTO_IP, IP_MTU_DISCOVER, &val, sizeof(val)); + { + Guard socketLock(_socketMutex); + auto sd = _networkSocket.socketDescriptor(socketType); + int val = IP_PMTUDISC_DONT; + setsockopt(sd, IPPROTO_IP, IP_MTU_DISCOVER, &val, sizeof(val)); + } #elif defined(Q_OS_WIN) - auto sd = _networkSocket.socketDescriptor(socketType); - int val = 0; // false - if (setsockopt(sd, IPPROTO_IP, IP_DONTFRAGMENT, (const char *)&val, sizeof(val))) { - auto wsaErr = WSAGetLastError(); - qCWarning(networking) << "Socket::bind Cannot setsockopt IP_DONTFRAGMENT" << wsaErr; + { + Guard socketLock(_socketMutex); + + auto sd = _networkSocket.socketDescriptor(socketType); + int val = 0; // false + if (setsockopt(sd, IPPROTO_IP, IP_DONTFRAGMENT, (const char *)&val, sizeof(val))) { + auto wsaErr = WSAGetLastError(); + qCWarning(networking) << "Socket::bind Cannot setsockopt IP_DONTFRAGMENT" << wsaErr; + } } #endif } } void Socket::rebind(SocketType socketType) { + Guard socketLock(_socketMutex); rebind(socketType, _networkSocket.localPort(socketType)); } void Socket::rebind(SocketType socketType, quint16 localPort) { + Guard socketLock(_socketMutex); _networkSocket.abort(socketType); bind(socketType, QHostAddress::AnyIPv4, localPort); } @@ -98,6 +110,8 @@ const WebRTCSocket* Socket::getWebRTCSocket() { #endif void Socket::setSystemBufferSizes(SocketType socketType) { + Guard socketLock(_socketMutex); + for (int i = 0; i < 2; i++) { QAbstractSocket::SocketOption bufferOpt; QString bufferTypeString; @@ -106,13 +120,13 @@ void Socket::setSystemBufferSizes(SocketType socketType) { if (i == 0) { bufferOpt = QAbstractSocket::SendBufferSizeSocketOption; - numBytes = socketType == SocketType::UDP + numBytes = socketType == SocketType::UDP ? udt::UDP_SEND_BUFFER_SIZE_BYTES : udt::WEBRTC_SEND_BUFFER_SIZE_BYTES; bufferTypeString = "send"; } else { bufferOpt = QAbstractSocket::ReceiveBufferSizeSocketOption; - numBytes = socketType == SocketType::UDP + numBytes = socketType == SocketType::UDP ? udt::UDP_RECEIVE_BUFFER_SIZE_BYTES : udt::WEBRTC_RECEIVE_BUFFER_SIZE_BYTES; bufferTypeString = "receive"; } @@ -245,6 +259,8 @@ qint64 Socket::writeDatagram(const char* data, qint64 size, const SockAddr& sock } qint64 Socket::writeDatagram(const QByteArray& datagram, const SockAddr& sockAddr) { + Guard socketLock(_socketMutex); + auto socketType = sockAddr.getType(); // don't attempt to write the datagram if we're unbound. Just drop it. @@ -355,6 +371,8 @@ void Socket::messageFailed(Connection* connection, Packet::MessageNumber message } void Socket::checkForReadyReadBackup() { + Guard socketLock(_socketMutex); + if (_networkSocket.hasPendingDatagrams()) { qCDebug(networking) << "Socket::checkForReadyReadBackup() detected blocked readyRead signal. Flushing pending datagrams."; @@ -377,14 +395,20 @@ void Socket::checkForReadyReadBackup() { } } +// This is here just so that we can do locking in the while below. +bool Socket::hasPending(int &packetSizeWithHeader) { + Guard socketLock(_socketMutex); + return _networkSocket.hasPendingDatagrams() && + (packetSizeWithHeader = _networkSocket.pendingDatagramSize()) != -1; +} + void Socket::readPendingDatagrams() { using namespace std::chrono; static const auto MAX_PROCESS_TIME { 100ms }; const auto abortTime = system_clock::now() + MAX_PROCESS_TIME; int packetSizeWithHeader = -1; - while (_networkSocket.hasPendingDatagrams() && - (packetSizeWithHeader = _networkSocket.pendingDatagramSize()) != -1) { + while (hasPending(packetSizeWithHeader)) { if (system_clock::now() > abortTime) { // We've been running for too long, stop processing packets for now // Once we've processed the event queue, we'll come back to packet processing @@ -409,7 +433,11 @@ void Socket::readPendingDatagrams() { auto buffer = std::unique_ptr(new char[packetSizeWithHeader]); // pull the datagram - auto sizeRead = _networkSocket.readDatagram(buffer.get(), packetSizeWithHeader, &senderSockAddr); + int sizeRead = 0; + { + Guard socketLock(_socketMutex); + sizeRead = _networkSocket.readDatagram(buffer.get(), packetSizeWithHeader, &senderSockAddr); + } // save information for this packet, in case it is the one that sticks readyRead _lastPacketSizeRead = sizeRead; @@ -552,6 +580,8 @@ std::vector Socket::getConnectionSockAddrs() { } void Socket::handleSocketError(SocketType socketType, QAbstractSocket::SocketError socketError) { + Guard socketLock(_socketMutex); + int wsaError = 0; static std::atomic previousWsaError(0); #ifdef WIN32 diff --git a/libraries/networking/src/udt/Socket.h b/libraries/networking/src/udt/Socket.h index ab9699bb8f..6d625a6450 100644 --- a/libraries/networking/src/udt/Socket.h +++ b/libraries/networking/src/udt/Socket.h @@ -52,6 +52,8 @@ class Socket : public QObject { using Mutex = std::mutex; using Lock = std::unique_lock; + using Guard = std::lock_guard; + public: using StatsVector = std::vector>; @@ -123,6 +125,7 @@ private slots: std::vector getConnectionSockAddrs(); void connectToSendSignal(const SockAddr& destinationAddr, QObject* receiver, const char* slot); + bool hasPending(int &packetSizeWithHeader); Q_INVOKABLE void writeReliablePacket(Packet* packet, const SockAddr& sockAddr); Q_INVOKABLE void writeReliablePacketList(PacketList* packetList, const SockAddr& sockAddr); @@ -135,6 +138,8 @@ private slots: Mutex _unreliableSequenceNumbersMutex; Mutex _connectionsHashMutex; + Mutex _socketMutex; + std::unordered_map _unfilteredHandlers; std::unordered_map _unreliableSequenceNumbers;