-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
threading/channels.Chan is not free of data-races according to thread-sanitizers #55
Comments
Here's what's happening in the if chan.isFull(): return false
acquire(chan.lock)
if chan.isFull():
release(chan.lock)
return false My understanding of the logic I've got when I was refactoring the code earlier is that the first So, if I'm not forgetting some edge case it's supposed to work for, this is an optimization and It would be beneficial to see if it really achieves any improved performance in real world cases (or at least tests). It's also interesting to see what tsan has to say about the original C code where basically the same order of operations happens. I don't know if |
Hmmm I'll look deeper into this to philosophize about it. |
BTW, I understand the code is just a minimal reproducing snippet, but just in case, for others reading: try to avoid calling |
For reference for any curious future reader, I wanted to have some kind of data to back up the assertion that this if-check was valid. So with some feedback from Zoom I wrote an example that creates my own versions of trySafe and tryRecv, one with, the other without this extra if-check. I also added in incrementing a counter to check for when an acquire happens that would've otherwise been prevented by the "unsafe" version of the access. So what this does is spawn N producer threads and M consumer threads. Producer threads just collectively try to send 1000 messages through a channel of size X (either 1,5,10,50,100 or 1000), while consumer threads try to read and atomically increment a counter. This is done 1000 times per configuration of threads and channel sizes and that "microbenchmark" is repeated once for the "Optimized" version (containing the data-race) and once for the "Safe" version. It spits the result out as CSV, basically how long sending those 1000 messages took on average. The "benchmark" code... way too longimport threading/channels {.all.}
import std/[locks, os, math, algorithm, strutils, strformat, sequtils, isolation, atomics, times]
# === SETUP START === #
var sendLockAcquireInst: Atomic[int]
proc customChannelSend(chan: ChannelRaw, data: pointer, size: int, blocking: static bool, optimize: static bool): bool =
assert not chan.isNil
assert not data.isNil
when optimize:
if chan.isFull(): return false
acquire(chan.lock)
# check for when another thread was faster to fill
when blocking:
while chan.isFull():
wait(chan.spaceAvailableCV, chan.lock)
else:
if chan.isFull():
sendLockAcquireInst.atomicInc
release(chan.lock)
return false
assert not chan.isFull()
let writeIdx = if chan.head < chan.slots:
chan.head
else:
chan.head - chan.slots
copyMem(chan.buffer[writeIdx * size].addr, data, size)
inc(chan.head)
if chan.head == 2 * chan.slots:
chan.head = 0
signal(chan.dataAvailableCV)
release(chan.lock)
result = true
proc trySendSafe*[T](c: Chan[T], src: sink Isolated[T]): bool {.inline.} =
## Sends item to the channel (non-blocking). Version not containing optimization, not containing data-race.
var data = src.extract
result = customChannelSend(c.d, data.unsafeAddr, sizeof(T), false, false)
if result:
wasMoved(data)
proc trySendOpt*[T](c: Chan[T], src: sink Isolated[T]): bool {.inline.} =
## Sends item to the channel (non-blocking). Version not containing optimization, not containing data-race.
var data = src.extract
result = customChannelSend(c.d, data.unsafeAddr, sizeof(T), false, true)
if result:
wasMoved(data)
var receiveLockAcquireInst: Atomic[int]
proc customChannelReceive(chan: ChannelRaw, data: pointer, size: int, blocking: static bool, optimize: static bool): bool =
assert not chan.isNil
assert not data.isNil
when optimize:
if chan.isEmpty(): return false
acquire(chan.lock)
# check for when another thread was faster to empty
when blocking:
while chan.isEmpty():
wait(chan.dataAvailableCV, chan.lock)
else:
if chan.isEmpty():
receiveLockAcquireInst.atomicInc
release(chan.lock)
return false
assert not chan.isEmpty()
let readIdx = if chan.tail < chan.slots:
chan.tail
else:
chan.tail - chan.slots
copyMem(data, chan.buffer[readIdx * size].addr, size)
inc(chan.tail)
if chan.tail == 2 * chan.slots:
chan.tail = 0
signal(chan.spaceAvailableCV)
release(chan.lock)
result = true
proc tryRecvSafe*[T](c: Chan[T], dst: var T): bool {.inline.} =
## Receives item from the channel (non-blocking). Optimized version
customChannelReceive(c.d, dst.addr, sizeof(T), false, false)
proc tryRecvOpt*[T](c: Chan[T], dst: var T): bool {.inline.} =
## Receives item from the channel (non-blocking). Optimized version
customChannelReceive(c.d, dst.addr, sizeof(T), false, true)
# === SETUP END === #
type Thing = ref object
txt: string
const MESSAGE_COUNT = 1_000
const ITERATION_COUNT = 1_000
const MAX_THREAD_COUNT = 5
const CHANNEL_SIZES = @[
1,
5,
10,
50,
100,
1_000,
]
var IS_RUNNING = true
let msgs = (1..MESSAGE_COUNT).mapIt(Thing(txt: "#" & $it))
type ThreadData = object
channelIndex: 0..CHANNEL_SIZES.len()
threadIndex: 0..4
threadCount: 1..5
var consumerThreads: seq[Thread[ThreadData]] = (1..MAX_THREAD_COUNT).mapIt(Thread[ThreadData]())
var producerThreads: seq[Thread[ThreadData]] = (1..MAX_THREAD_COUNT).mapIt(Thread[ThreadData]())
var optChannels = CHANNEL_SIZES.mapIt(newChan[Thing](it))
var optCounter: Atomic[int]
proc optLoop(data: ThreadData) {.thread, nimcall.} =
{.gcsafe.}:
while optCounter.load() < MESSAGE_COUNT:
var msg: Thing
let hasMsg = optChannels[data.channelIndex].tryRecvOpt(msg)
if hasMsg:
optCounter.atomicInc
var safeChannels = CHANNEL_SIZES.mapIt(newChan[Thing](it))
var safeCounter: Atomic[int]
proc safeLoop(data: ThreadData) {.thread , nimcall.} =
{.gcsafe.}:
while safeCounter.load() < MESSAGE_COUNT:
var msg: Thing
let hasMsg = safeChannels[data.channelIndex].tryRecvSafe(msg)
if hasMsg:
safeCounter.atomicInc
proc clientOptLoop(data: ThreadData) {.thread, nimcall.} =
{.gcsafe.}:
for index, msg in msgs:
if index.mod(data.threadCount) != data.threadIndex:
continue
while not optChannels[data.channelIndex].trySendOpt(unsafeIsolate(msg)):
discard
proc clientSafeLoop(data: ThreadData) {.thread, nimcall.} =
{.gcsafe.}:
for index, msg in msgs:
if index.mod(data.threadCount) != data.threadIndex:
continue
while not safeChannels[data.channelIndex].trySendSafe(unsafeIsolate(msg)):
discard
proc clean() =
optCounter.store(0)
safeCounter.store(0)
receiveLockAcquireInst.store(0)
sendLockAcquireInst.store(0)
for chan in optChannels:
assert chan.peek == 0
for chan in safeChannels:
assert chan.peek == 0
template benchBlock(name: string; iterations, consumerThreadCount, producerThreadCount: int; callBlock: untyped) =
var benchtimes: seq[float] = @[]
for index in 0..iterations:
clean()
var t0 = cpuTime()
callBlock
var t1 = cpuTime()
benchtimes.add((t1-t0)*1000)
benchtimes.sort()
let totalIterationCount = MESSAGE_COUNT * iterations
let medianIndex: int = int(benchtimes.high/2)
let median: float = benchtimes[medianIndex]
let max: float = benchtimes[^1]
let min: float = benchtimes[1]
let avg: float = benchtimes.sum() / iterations
let channelSize: int = CHANNEL_SIZES[index]
let columns = @[
name,
$consumerThreadCount,
$producerThreadCount,
$channelSize,
$sendLockAcquireInst.load(),
$receiveLockAcquireInst.load(),
$median,
$avg,
$max,
$min
].join(",")
echo columns
proc main() =
let columns = @[
"Type",
"ConsumerThreadCount",
"ProducerThreadCount",
"ChannelSize",
"Bad Send Acquires",
"Bad Recv Acquires",
"Median (µs)",
"Avg (µs)",
"Max (µs)",
"Min (µs)"
].join(",")
echo "MessageCountPerIteration,", MESSAGE_COUNT, ",TotalMessageCount,", MESSAGE_COUNT * ITERATION_COUNT, "\n"
echo columns
for consumerThreadCount in 1..consumerThreads.high:
for producerThreadCount in 1..producerThreads.high:
for index in 0..CHANNEL_SIZES.high:
benchBlock("Optimized", ITERATION_COUNT, consumerThreadCount, producerThreadCount):
for threadIndex in 0..(consumerThreadCount - 1):
createThread(consumerThreads[threadIndex], optloop, ThreadData(channelIndex: index, threadIndex: threadIndex, threadCount: consumerThreadCount))
for threadIndex in 0..(producerThreadCount - 1):
createThread(producerThreads[threadIndex], clientOptLoop, ThreadData(channelIndex: index, threadIndex: threadIndex, threadCount: producerThreadCount))
while optCounter.load() < MESSAGE_COUNT: sleep(0)
for threadIndex in 0..(producerThreadCount - 1):
joinThread(producerThreads[threadIndex])
for threadIndex in 0..(consumerThreadCount - 1):
joinThread(consumerThreads[threadIndex])
benchBlock("Safe", ITERATION_COUNT, consumerThreadCount, producerThreadCount):
for threadIndex in 0..(consumerThreadCount - 1):
createThread(consumerThreads[threadIndex], safeloop, ThreadData(channelIndex: index, threadIndex: threadIndex, threadCount: consumerThreadCount))
for threadIndex in 0..(producerThreadCount - 1):
createThread(producerThreads[threadIndex], clientSafeLoop, ThreadData(channelIndex: index, threadIndex: threadIndex, threadCount: producerThreadCount))
while safeCounter.load() < MESSAGE_COUNT: sleep(0)
for threadIndex in 0..(producerThreadCount - 1):
joinThread(producerThreads[threadIndex])
for threadIndex in 0..(consumerThreadCount - 1):
joinThread(consumerThreads[threadIndex])
main()
echo "Done" In general this is a limited benchmark, since it includes spawning and joining the thread inside the benchmark. |
All shared variable accesses should be protected by atomics or locks or ASAN will complain. head and tail can be atomics, and use |
TSAN is the particular thing complaining here ;-) But I agree, that could work nicely. I'd be happy to write up a PR on this one if @ZoomRmc isn't already on it. Either way I think it might make sense to introduce general asan/tsan checks in general, just to be able to provide hard guarantees like "This has no data-races and does not leak!". |
Head and Tail on ChannelRaw get accessed in channelSend and channelReceive before a lock acquisition happens in those procs. That means that the output of the template they get used in (isEmpty/isFull) is dependent on a data-race. That is because as a thread that has the lock may modify those values while they are being read by another thrread without the lock. This is noticed by thread-sanitizer and should be eliminated for the following reasons: - Users should not catch "false positives" for thread sanitizier such as this, which they currently will - It is generally a good idea to have multi-threaded code such as this data-race free This refactor to atomics makes the entire thing atomic and thus impossible to have a data-race with.
Tail and head are atomics and received getter/setter procs for better readability. For consistency, this was now also done for the atomicCounter field.
This allows spotting data-races in the threading code automatically
@mratsim Could you take a look at zooms comment in #56 (comment) |
Using the memory order "Release" here enables a data-race according to tsan. The data-race being *potentially* a false-positive. The race it identifies is between 2 threads both trying to destroy the smartPtr at the same time. They'll both call the decr proc in smartptrs.nim:92 and get past the nil check. Then one of them might get past the second if check and deallocate the ptr. However, that should be impossible to lead to a data race. The count of 0 can only be reached *once* if you're the last thread. I can only assume compiler-order-reshuffling may enable a race here. Using the default mode (SeqCst) as well as Acquire Release (AcqRel) gets rid of said data-race.
Clang does not support tsan for windows. The checks for linux and macOs should suffice.
i386 is a 32bit platform. Clang by default is 64bit. It should be as simple as just requiring clang to build in 32bit mode with -m32.
…hilippMDoerner/threading into bugfix/nim-lang#55-refactor-to-atomics
Heyho, while working to clear up my own package of data-races I noticed that this example will declare a data-race in thread-sanitizer (tsan):
Compiled with:
nim r --cc:clang --mm:orc -d:release -d:useMalloc -f --passc:"-fsanitize=thread -fno-omit-frame-pointer -mno-omit-leaf-frame-pointer" --debugger:native --passl:"-fsanitize=thread -fno-omit-frame-pointer -mno-omit-leaf-frame-pointer" src/playground.nim
tsan sees a data-race here between
trySend
withchannelSend
in one thread andtryRecv
withchannelReceive
in the other. I'm not entirely sure how to interpret that, but I would assume it is not a false positive.Even if the data-race does not pose a real problem, I would want to solve it because users of the lib (like me) would want to tsan-check their own applications to make them as stable as can be.
I'm missing too much of an understanding of threading/channels.Chan inner working to be able to tell if that is actually the case and what these two are racing through.
Edit: Beef stated he might've seen what caused the issue. I quote: "isFull is called before a lock happens". Would changing that be the solution here? Leorize suggested to use try-acquire to get that lock or using approaches without an if there, though I can't imagine them.
tsan stacktrace
```txt WARNING: ThreadSanitizer: data race (pid=2235220) Write of size 8 at 0x7b2c00000090 by main thread (mutexes: write M0): #0 channelSend__playground_u356 /home/philipp/dev/playground/channels.nim:192:130 (playground_1B88682F16D8AFE3768CDB5129F5ABAE744E02A7+0xed985) (BuildId: 92969305384c4f84ef7e6297746e94551b123ec1) #1 trySend__playground_u329 /home/philipp/.nimble/pkgs2/threading-0.2.0-3cd4360369b8abf1c53ddfd49ea8aef70208658c/threading/channels.nim:271:11 (playground_1B88682F16D8AFE3768CDB5129F5ABAE744E02A7+0xedf37) (BuildId: 92969305384c4f84ef7e6297746e94551b123ec1) #2 main__playground_u277 /home/philipp/dev/playground/src/playground.nim:16:27 (playground_1B88682F16D8AFE3768CDB5129F5ABAE744E02A7+0xedf37) #3 NimMainModule /home/philipp/dev/playground/src/playground.nim:19:2 (playground_1B88682F16D8AFE3768CDB5129F5ABAE744E02A7+0xee320) (BuildId: 92969305384c4f84ef7e6297746e94551b123ec1) #4 NimMainInner /home/philipp/dev/playground/src/playground.nim:41:2 (playground_1B88682F16D8AFE3768CDB5129F5ABAE744E02A7+0xee320) #5 NimMain /home/philipp/dev/playground/src/playground.nim:52:2 (playground_1B88682F16D8AFE3768CDB5129F5ABAE744E02A7+0xee320) #6 main /home/philipp/dev/playground/src/playground.nim:60:2 (playground_1B88682F16D8AFE3768CDB5129F5ABAE744E02A7+0xee320)Previous read of size 8 at 0x7b2c00000090 by thread T1:
#0 channelReceive__playground_u179 /home/philipp/.nimble/pkgs2/threading-0.2.0-3cd4360369b8abf1c53ddfd49ea8aef70208658c/threading/channels.nim:205:22 (playground_1B88682F16D8AFE3768CDB5129F5ABAE744E02A7+0xed609) (BuildId: 92969305384c4f84ef7e6297746e94551b123ec1)
#1 tryRecv__playground_u167 /home/philipp/.nimble/pkgs2/threading-0.2.0-3cd4360369b8abf1c53ddfd49ea8aef70208658c/threading/channels.nim:281:11 (playground_1B88682F16D8AFE3768CDB5129F5ABAE744E02A7+0xedcab) (BuildId: 92969305384c4f84ef7e6297746e94551b123ec1)
#2 loop__playground_u165 /home/philipp/dev/playground/src/playground.nim:10:57 (playground_1B88682F16D8AFE3768CDB5129F5ABAE744E02A7+0xedcab)
#3 threadProcWrapDispatch__stdZtypedthreads_u105 /home/philipp/.choosenim/toolchains/nim-2.0.2/lib/system/threadimpl.nim:66:2 (playground_1B88682F16D8AFE3768CDB5129F5ABAE744E02A7+0xed059) (BuildId: 92969305384c4f84ef7e6297746e94551b123ec1)
#4 threadProcWrapStackFrame__stdZtypedthreads_u95 /home/philipp/.choosenim/toolchains/nim-2.0.2/lib/system/threadimpl.nim:95:2 (playground_1B88682F16D8AFE3768CDB5129F5ABAE744E02A7+0xed059)
#5 threadProcWrapper__stdZtypedthreads_u81 /home/philipp/.choosenim/toolchains/nim-2.0.2/lib/system/threadimpl.nim:101:2 (playground_1B88682F16D8AFE3768CDB5129F5ABAE744E02A7+0xe7b45) (BuildId: 92969305384c4f84ef7e6297746e94551b123ec1)
Location is heap block of size 176 at 0x7b2c00000000 allocated by main thread:
#0 malloc (playground_1B88682F16D8AFE3768CDB5129F5ABAE744E02A7+0x79b63) (BuildId: 92969305384c4f84ef7e6297746e94551b123ec1)
#1 allocChannel__OOZOOZOOZOnimbleZpkgs50Zthreading4548O50O484551cd52515448515457b56abf49c5351ddfd5257ea56aef5548504856545356cZthreadingZchannels_u40 /home/philipp/.nimble/pkgs2/threading-0.2.0-3cd4360369b8abf1c53ddfd49ea8aef70208658c/threading/channels.nim:134:24 (playground_1B88682F16D8AFE3768CDB5129F5ABAE744E02A7+0xed41c) (BuildId: 92969305384c4f84ef7e6297746e94551b123ec1)
#2 newChan__playground_u4 /home/philipp/.nimble/pkgs2/threading-0.2.0-3cd4360369b8abf1c53ddfd49ea8aef70208658c/threading/channels.nim:316:13 (playground_1B88682F16D8AFE3768CDB5129F5ABAE744E02A7+0xed54a) (BuildId: 92969305384c4f84ef7e6297746e94551b123ec1)
#3 NimMainModule /home/philipp/dev/playground/src/playground.nim:4:85 (playground_1B88682F16D8AFE3768CDB5129F5ABAE744E02A7+0xee2f6) (BuildId: 92969305384c4f84ef7e6297746e94551b123ec1)
#4 NimMainInner /home/philipp/dev/playground/src/playground.nim:41:2 (playground_1B88682F16D8AFE3768CDB5129F5ABAE744E02A7+0xee2f6)
#5 NimMain /home/philipp/dev/playground/src/playground.nim:52:2 (playground_1B88682F16D8AFE3768CDB5129F5ABAE744E02A7+0xee2f6)
#6 main /home/philipp/dev/playground/src/playground.nim:60:2 (playground_1B88682F16D8AFE3768CDB5129F5ABAE744E02A7+0xee2f6)
Mutex M0 (0x7b2c00000000) created at:
#0 pthread_mutex_init (playground_1B88682F16D8AFE3768CDB5129F5ABAE744E02A7+0x99f78) (BuildId: 92969305384c4f84ef7e6297746e94551b123ec1)
#1 initLock__coreZlocks_u7 /home/philipp/.choosenim/toolchains/nim-2.0.2/lib/core/locks.nim:38:2 (playground_1B88682F16D8AFE3768CDB5129F5ABAE744E02A7+0xed45e) (BuildId: 92969305384c4f84ef7e6297746e94551b123ec1)
#2 allocChannel__OOZOOZOOZOnimbleZpkgs50Zthreading4548O50O484551cd52515448515457b56abf49c5351ddfd5257ea56aef5548504856545356cZthreadingZchannels_u40 /home/philipp/.nimble/pkgs2/threading-0.2.0-3cd4360369b8abf1c53ddfd49ea8aef70208658c/threading/channels.nim:139:2 (playground_1B88682F16D8AFE3768CDB5129F5ABAE744E02A7+0xed45e)
#3 newChan__playground_u4 /home/philipp/.nimble/pkgs2/threading-0.2.0-3cd4360369b8abf1c53ddfd49ea8aef70208658c/threading/channels.nim:316:13 (playground_1B88682F16D8AFE3768CDB5129F5ABAE744E02A7+0xed54a) (BuildId: 92969305384c4f84ef7e6297746e94551b123ec1)
#4 NimMainModule /home/philipp/dev/playground/src/playground.nim:4:85 (playground_1B88682F16D8AFE3768CDB5129F5ABAE744E02A7+0xee2f6) (BuildId: 92969305384c4f84ef7e6297746e94551b123ec1)
#5 NimMainInner /home/philipp/dev/playground/src/playground.nim:41:2 (playground_1B88682F16D8AFE3768CDB5129F5ABAE744E02A7+0xee2f6)
#6 NimMain /home/philipp/dev/playground/src/playground.nim:52:2 (playground_1B88682F16D8AFE3768CDB5129F5ABAE744E02A7+0xee2f6)
#7 main /home/philipp/dev/playground/src/playground.nim:60:2 (playground_1B88682F16D8AFE3768CDB5129F5ABAE744E02A7+0xee2f6)
Thread T1 (tid=2235222, running) created by main thread at:
#0 pthread_create (playground_1B88682F16D8AFE3768CDB5129F5ABAE744E02A7+0x64a36) (BuildId: 92969305384c4f84ef7e6297746e94551b123ec1)
#1 createThread__stdZtypedthreads_u60 /home/philipp/.choosenim/toolchains/nim-2.0.2/lib/std/typedthreads.nim:246:103 (playground_1B88682F16D8AFE3768CDB5129F5ABAE744E02A7+0xe7c79) (BuildId: 92969305384c4f84ef7e6297746e94551b123ec1)
#2 createThread__stdZtypedthreads_u51 /home/philipp/.choosenim/toolchains/nim-2.0.2/lib/std/typedthreads.nim:262:2 (playground_1B88682F16D8AFE3768CDB5129F5ABAE744E02A7+0xe7d70) (BuildId: 92969305384c4f84ef7e6297746e94551b123ec1)
#3 main__playground_u277 /home/philipp/dev/playground/src/playground.nim:15:2 (playground_1B88682F16D8AFE3768CDB5129F5ABAE744E02A7+0xede8a) (BuildId: 92969305384c4f84ef7e6297746e94551b123ec1)
#4 NimMainModule /home/philipp/dev/playground/src/playground.nim:19:2 (playground_1B88682F16D8AFE3768CDB5129F5ABAE744E02A7+0xee320) (BuildId: 92969305384c4f84ef7e6297746e94551b123ec1)
#5 NimMainInner /home/philipp/dev/playground/src/playground.nim:41:2 (playground_1B88682F16D8AFE3768CDB5129F5ABAE744E02A7+0xee320)
#6 NimMain /home/philipp/dev/playground/src/playground.nim:52:2 (playground_1B88682F16D8AFE3768CDB5129F5ABAE744E02A7+0xee320)
#7 main /home/philipp/dev/playground/src/playground.nim:60:2 (playground_1B88682F16D8AFE3768CDB5129F5ABAE744E02A7+0xee320)
SUMMARY: ThreadSanitizer: data race /home/philipp/dev/playground/channels.nim:192:130 in channelSend__playground_u356
ThreadSanitizer: reported 1 warnings
The text was updated successfully, but these errors were encountered: