Skip to content

Commit

Permalink
Fix non-thread-safe mutex access
Browse files Browse the repository at this point in the history
When running SwiftWebSocket with the Thread Sanitizer, we noticed that there was a swift race condition when accessing the Manager's, or InnerWebSocket's mutex. Suspecting that this was simply a bug with the Thread Sanitizer we filed a Radar with Apple.  Their response was this was actually a true positive and directed us to this article: http://www.russbishop.net/the-law.

Because Swifts `&` might allow for mutation on multiple threads, we need to allocate our own memory for the mutex to avoid a heap corruption.

tidwall/SwiftWebSocket PR#tidwall#141
IanHoar/SwiftWebSocket PR#1
  • Loading branch information
andriimakukha committed Dec 11, 2019
1 parent cf7aeb1 commit 2a7320d
Showing 1 changed file with 39 additions and 25 deletions.
64 changes: 39 additions & 25 deletions Source/WebSocket.swift
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ private class Deflater {
/// WebSocket objects are bidirectional network streams that communicate over HTTP. RFC 6455.
private class InnerWebSocket: Hashable {
var id : Int
var mutex = pthread_mutex_t()
var mutex = UnsafeMutablePointer<pthread_mutex_t>.allocate(capacity: 1)
let request : URLRequest!
let subProtocols : [String]!
var frames : [Frame] = []
Expand Down Expand Up @@ -611,9 +611,10 @@ private class InnerWebSocket: Hashable {
func hash(into hasher: inout Hasher) {
hasher.combine(id)
}

init(request: URLRequest, subProtocols : [String] = [], stub : Bool = false){
pthread_mutex_init(&mutex, nil)
mutex.initialize(to: pthread_mutex_t())
pthread_mutex_init(mutex, nil)
self.id = manager.nextId()
self.request = request
self.subProtocols = subProtocols
Expand All @@ -639,13 +640,14 @@ private class InnerWebSocket: Hashable {
if inputBytes != nil {
free(inputBytes)
}
pthread_mutex_init(&mutex, nil)
pthread_mutex_init(mutex, nil)
mutex.deallocate()
}
@inline(__always) fileprivate func lock(){
pthread_mutex_lock(&mutex)
pthread_mutex_lock(mutex)
}
@inline(__always) fileprivate func unlock(){
pthread_mutex_unlock(&mutex)
pthread_mutex_unlock(mutex)
}

fileprivate var dirty : Bool {
Expand Down Expand Up @@ -1035,7 +1037,7 @@ private class InnerWebSocket: Hashable {
security = .none
}

var path = CFURLCopyPath(req.url! as CFURL?) as String
var path = CFURLCopyPath(req.url! as CFURL) as String
if path == "" {
path = "/"
}
Expand Down Expand Up @@ -1069,7 +1071,7 @@ private class InnerWebSocket: Hashable {
var (rdo, wro) : (InputStream?, OutputStream?)
var readStream: Unmanaged<CFReadStream>?
var writeStream: Unmanaged<CFWriteStream>?
CFStreamCreatePairWithSocketToHost(nil, addr[0] as CFString?, UInt32(Int(addr[1])!), &readStream, &writeStream);
CFStreamCreatePairWithSocketToHost(nil, addr[0] as CFString, UInt32(Int(addr[1])!), &readStream, &writeStream);
rdo = readStream!.takeRetainedValue()
wro = writeStream!.takeRetainedValue()
(rd, wr) = (rdo!, wro!)
Expand Down Expand Up @@ -1155,8 +1157,8 @@ private class InnerWebSocket: Hashable {
} else {
key = ""
if let r = line.range(of: ":") {
key = trim(String(line.prefix(upTo: r.lowerBound)))
value = trim(String(line.suffix(from: r.upperBound)))
key = trim(String(line[..<r.lowerBound]))
value = trim(String(line[r.upperBound...]))
}
}

Expand Down Expand Up @@ -1613,38 +1615,43 @@ private enum TCPConnSecurity {
private class Manager {
var queue = DispatchQueue(label: "SwiftWebSocketInstance", attributes: [])
var once = Int()
var mutex = pthread_mutex_t()
var mutex = UnsafeMutablePointer<pthread_mutex_t>.allocate(capacity: 1)
var cond = pthread_cond_t()
var websockets = Set<InnerWebSocket>()
var _nextId = 0
init(){
pthread_mutex_init(&mutex, nil)
mutex.initialize(to: pthread_mutex_t())
pthread_mutex_init(mutex, nil)
pthread_cond_init(&cond, nil)
DispatchQueue(label: "SwiftWebSocket", attributes: []).async {
var wss : [InnerWebSocket] = []
while true {
var wait = true
wss.removeAll()
pthread_mutex_lock(&self.mutex)
pthread_mutex_lock(self.mutex)
for ws in self.websockets {
wss.append(ws)
}
for ws in wss {
self.checkForConnectionTimeout(ws)
if ws.dirty {
pthread_mutex_unlock(&self.mutex)
pthread_mutex_unlock(self.mutex)
ws.step()
pthread_mutex_lock(&self.mutex)
pthread_mutex_lock(self.mutex)
wait = false
}
}
if wait {
_ = self.wait(250)
}
pthread_mutex_unlock(&self.mutex)
pthread_mutex_unlock(self.mutex)
}
}
}
deinit{
pthread_mutex_init(mutex, nil)
mutex.deallocate()
}
func checkForConnectionTimeout(_ ws : InnerWebSocket) {
if ws.rd != nil && ws.wr != nil && (ws.rd.streamStatus == .opening || ws.wr.streamStatus == .opening) {
let age = CFAbsoluteTimeGetCurrent() - ws.createdAt
Expand All @@ -1663,28 +1670,28 @@ private class Manager {
ts.tv_nsec = v1 + v2;
ts.tv_sec += ts.tv_nsec / (1000 * 1000 * 1000);
ts.tv_nsec %= (1000 * 1000 * 1000);
return pthread_cond_timedwait(&self.cond, &self.mutex, &ts)
return pthread_cond_timedwait(&self.cond, self.mutex, &ts)
}
func signal(){
pthread_mutex_lock(&mutex)
pthread_mutex_lock(mutex)
pthread_cond_signal(&cond)
pthread_mutex_unlock(&mutex)
pthread_mutex_unlock(mutex)
}
func add(_ websocket: InnerWebSocket) {
pthread_mutex_lock(&mutex)
pthread_mutex_lock(mutex)
websockets.insert(websocket)
pthread_cond_signal(&cond)
pthread_mutex_unlock(&mutex)
pthread_mutex_unlock(mutex)
}
func remove(_ websocket: InnerWebSocket) {
pthread_mutex_lock(&mutex)
pthread_mutex_lock(mutex)
websockets.remove(websocket)
pthread_cond_signal(&cond)
pthread_mutex_unlock(&mutex)
pthread_mutex_unlock(mutex)
}
func nextId() -> Int {
pthread_mutex_lock(&mutex)
defer { pthread_mutex_unlock(&mutex) }
pthread_mutex_lock(mutex)
defer { pthread_mutex_unlock(mutex) }
_nextId += 1
return _nextId
}
Expand All @@ -1693,11 +1700,17 @@ private class Manager {
private let manager = Manager()

/// WebSocket objects are bidirectional network streams that communicate over HTTP. RFC 6455.
@objcMembers
open class WebSocket: NSObject {
fileprivate var ws: InnerWebSocket
fileprivate var id = manager.nextId()
fileprivate var opened: Bool

open override var hash: Int { return id }
open override func isEqual(_ other: Any?) -> Bool {
guard let other = other as? WebSocket else { return false }
return self.id == other.id
}

/// Create a WebSocket connection to a URL; this should be the URL to which the WebSocket server will respond.
public convenience init(_ url: String){
Expand Down Expand Up @@ -1861,6 +1874,7 @@ public func ==(lhs: WebSocket, rhs: WebSocket) -> Bool {

extension WebSocket {
/// The events of the WebSocket using a delegate.
@objc
public var delegate : WebSocketDelegate? {
get { return ws.eventDelegate }
set { ws.eventDelegate = newValue }
Expand Down

0 comments on commit 2a7320d

Please sign in to comment.