diff --git a/libgo/routine_sync/channel.h b/libgo/routine_sync/channel.h index f152d8bb..ee01575c 100644 --- a/libgo/routine_sync/channel.h +++ b/libgo/routine_sync/channel.h @@ -33,10 +33,7 @@ class ChannelImplWithSignal : public DebuggerId> RS_DBG(dbg_channel, "channel=%ld | %s | ptr(t)=0x%p | isWait=%d | abstime=%d | closed=%d | popWaiting_=%p | pushWaiting_=%p", id(), __func__, (void*)&t, isWait, !!abstime, closed_, (void*)popWaiting_, (void*)pushWaiting_); - if (closed_) - return false; - - if (pushWaiting_) { // 有push协程在等待, 读出来 & 清理 + if (pushWaiting_ && pushWaiting_ != (ConditionVariable*)kClosedWaiting) { // 有push协程在等待, 读出来 & 清理 t = *pushQ_; pushQ_ = nullptr; @@ -51,6 +48,9 @@ class ChannelImplWithSignal : public DebuggerId> return true; } + if (closed_) + return false; + if (!isWait) { RS_DBG(dbg_channel, "channel=%ld | %s | not match && not wait | return false", id(), __func__); @@ -68,15 +68,16 @@ class ChannelImplWithSignal : public DebuggerId> id(), __func__); (void)waiting.wait_until_p(lock, abstime, [&]{ return popWaiting_ != &waiting; }); - bool ok = popWaiting_ != &waiting; + bool changed = (popWaiting_ != &waiting); + bool ok = changed && popWaiting_ != (ConditionVariable*)kClosedWaiting; - RS_DBG(dbg_channel, "channel=%ld | %s | waked | matched=%d", - id(), __func__, ok); + RS_DBG(dbg_channel, "channel=%ld | %s | waked | closed=%d | changed=%d | ok=%d", + id(), __func__, closed_, changed, ok); if (ok) { // 成功 t = std::move(temp); // 对外部T的写操作放到本线程来做, 降低使用难度 - } else { + } else if (!changed) { // 超时,清理 popQ_ = nullptr; popWaiting_ = nullptr; @@ -93,13 +94,13 @@ class ChannelImplWithSignal : public DebuggerId> RS_DBG(dbg_channel, "channel=%ld | %s | ptr(t)=0x%p | isWait=%d | abstime=%d | closed=%d | popWaiting_=%p | pushWaiting_=%p", id(), __func__, (void*)&t, isWait, !!abstime, closed_, (void*)popWaiting_, (void*)pushWaiting_); + if (closed_) + return false; + if (!popWaiting_) { return pop_impl_with_signal_noqueued(t, isWait, abstime, lock); } - if (closed_) - return false; - if (!isWait) { RS_DBG(dbg_channel, "channel=%ld | %s | pop contended && not wait | return false", id(), __func__); @@ -119,8 +120,12 @@ class ChannelImplWithSignal : public DebuggerId> } } - RS_DBG(dbg_channel, "channel=%ld | %s | waked | pop idle", - id(), __func__); + RS_DBG(dbg_channel, "channel=%ld | %s | waked | closed=%d | pop idle", + id(), __func__, closed_); + + if (closed_) + return false; + return pop_impl_with_signal_noqueued(t, isWait, abstime, lock); } @@ -132,10 +137,7 @@ class ChannelImplWithSignal : public DebuggerId> RS_DBG(dbg_channel, "channel=%ld | %s | ptr(t)=0x%p | isWait=%d | abstime=%d | closed=%d | popWaiting_=%p | pushWaiting_=%p", id(), __func__, (void*)&t, isWait, !!abstime, closed_, (void*)popWaiting_, (void*)pushWaiting_); - if (closed_) - return false; - - if (popWaiting_) { // 有pop协程在等待, 写入 & 清理 + if (popWaiting_ && popWaiting_ != (ConditionVariable*)kClosedWaiting) { // 有pop协程在等待, 写入 & 清理 *popQ_ = t; popQ_ = nullptr; @@ -150,6 +152,9 @@ class ChannelImplWithSignal : public DebuggerId> return true; } + if (closed_) + return false; + if (!isWait) { RS_DBG(dbg_channel, "channel=%ld | %s | not match && not wait | return false", id(), __func__); @@ -166,18 +171,20 @@ class ChannelImplWithSignal : public DebuggerId> id(), __func__); (void)waiting.wait_until_p(lock, abstime, [&]{ return pushWaiting_ != &waiting; }); - bool ok = pushWaiting_ != &waiting; + bool changed = (pushWaiting_ != &waiting); + bool ok = changed && pushWaiting_ != (ConditionVariable*)kClosedWaiting; - RS_DBG(dbg_channel, "channel=%ld | %s | waked | matched=%d", - id(), __func__, ok); + RS_DBG(dbg_channel, "channel=%ld | %s | waked | closed=%d | changed=%d | ok=%d", + id(), __func__, closed_, changed, ok); if (ok) { // 成功 - } else { + } else if (!changed) { // 超时,清理 pushQ_ = nullptr; pushWaiting_ = nullptr; } + return ok; } @@ -194,6 +201,9 @@ class ChannelImplWithSignal : public DebuggerId> return push_impl_with_signal_noqueued(t, isWait, abstime, lock); } + if (closed_) + return false; + if (!isWait) { RS_DBG(dbg_channel, "channel=%ld | %s | push contended && not wait | return false", id(), __func__); @@ -213,11 +223,36 @@ class ChannelImplWithSignal : public DebuggerId> } } - RS_DBG(dbg_channel, "channel=%ld | %s | waked | push idle", - id(), __func__); + RS_DBG(dbg_channel, "channel=%ld | %s | waked | closed=%d | push idle", + id(), __func__, closed_); + + if (closed_) + return false; + return push_impl_with_signal_noqueued(t, isWait, abstime, lock); } + void impl_with_signal_close(std::unique_lock & lock) + { + long push_wakeup = 0; + long pop_wakeup = 0; + pushQ_ = nullptr; + popQ_ = nullptr; + if (pushWaiting_) { + pushWaiting_->fast_notify_all(lock); + pushWaiting_ = (ConditionVariable*)kClosedWaiting; + push_wakeup = 1; + } + if (popWaiting_) { + popWaiting_->fast_notify_all(lock); + popWaiting_ = (ConditionVariable*)kClosedWaiting; + pop_wakeup = 1; + } + + RS_DBG(dbg_channel, "channel(...)=%ld | %s | no-cap branch | push-wakeup=%ld | pop-wakeup=%ld", + id(), __func__, push_wakeup, pop_wakeup); + } + protected: Mutex mtx_; ConditionVariable pushCv_; @@ -228,6 +263,8 @@ class ChannelImplWithSignal : public DebuggerId> T* popQ_ {nullptr}; ConditionVariable* pushWaiting_ {nullptr}; ConditionVariable* popWaiting_ {nullptr}; + + static const std::size_t kClosedWaiting = (std::size_t)-1; }; template < @@ -248,6 +285,7 @@ class ChannelImpl : public ChannelImplWithSignal using ChannelImplWithSignal::pop_impl_with_signal; using ChannelImplWithSignal::push_impl_with_signal; using ChannelImplWithSignal::id; + using ChannelImplWithSignal::impl_with_signal_close; explicit ChannelImpl(std::size_t capacity = 0) : cap_(capacity) @@ -302,22 +340,16 @@ class ChannelImpl : public ChannelImplWithSignal std::unique_lock lock(mtx_); closed_ = true; if (!cap_) { - pushQ_ = nullptr; - popQ_ = nullptr; - if (pushWaiting_) { - pushWaiting_->fast_notify_all(lock); - pushWaiting_ = nullptr; - } - if (popWaiting_) { - popWaiting_->fast_notify_all(lock); - popWaiting_ = nullptr; - } + impl_with_signal_close(lock); } - pushCv_.fast_notify_all(lock); - popCv_.fast_notify_all(lock); - QueueT q; - std::swap(q, q_); + long push_wakeup = pushCv_.fast_notify_all(lock); + long pop_wakeup = popCv_.fast_notify_all(lock); + (void)push_wakeup; + (void)pop_wakeup; + + RS_DBG(dbg_channel, "channel(queue)=%ld | %s | cap=%lu | size=%lu | push-wakeup=%ld | pop-wakeup=%ld", + id(), __func__, cap_, q_.size(), push_wakeup, pop_wakeup); } private: @@ -349,7 +381,7 @@ class ChannelImpl : public ChannelImplWithSignal return false; } - auto p = [this]{ return q_.size() < cap_; }; + auto p = [this]{ return q_.size() < cap_ || closed_; }; RS_DBG(dbg_channel, "channel(queue)=%ld | %s | begin wait", id(), __func__); @@ -404,7 +436,7 @@ class ChannelImpl : public ChannelImplWithSignal return false; } - auto p = [this]{ return !q_.empty(); }; + auto p = [this]{ return !q_.empty() || closed_; }; RS_DBG(dbg_channel, "channel(queue)=%ld | %s | begin wait", id(), __func__); @@ -418,7 +450,7 @@ class ChannelImpl : public ChannelImplWithSignal if (status == std::cv_status::timeout) return false; - if (closed_) + if (q_.empty()) return false; t = q_.front(); @@ -454,6 +486,7 @@ class ChannelImpl : public ChannelImplWithSignal using ChannelImplWithSignal::pop_impl_with_signal; using ChannelImplWithSignal::push_impl_with_signal; using ChannelImplWithSignal::id; + using ChannelImplWithSignal::impl_with_signal_close; explicit ChannelImpl(std::size_t capacity = 0) : cap_(capacity), count_(0) @@ -508,21 +541,16 @@ class ChannelImpl : public ChannelImplWithSignal std::unique_lock lock(mtx_); closed_ = true; if (!cap_) { - pushQ_ = nullptr; - popQ_ = nullptr; - if (pushWaiting_) { - pushWaiting_->fast_notify_all(lock); - pushWaiting_ = nullptr; - } - if (popWaiting_) { - popWaiting_->fast_notify_all(lock); - popWaiting_ = nullptr; - } + impl_with_signal_close(lock); } - pushCv_.fast_notify_all(lock); - popCv_.fast_notify_all(lock); - count_ = 0; + long push_wakeup = pushCv_.fast_notify_all(lock); + long pop_wakeup = popCv_.fast_notify_all(lock); + (void)push_wakeup; + (void)pop_wakeup; + + RS_DBG(dbg_channel, "channel(void)=%ld | %s | cap=%lu | size=%lu | push-wakeup=%ld | pop-wakeup=%ld", + id(), __func__, cap_, count_, push_wakeup, pop_wakeup); } private: @@ -554,7 +582,7 @@ class ChannelImpl : public ChannelImplWithSignal return false; } - auto p = [this]{ return count_ < cap_; }; + auto p = [this]{ return count_ < cap_ || closed_; }; RS_DBG(dbg_channel, "channel(void)=%ld | %s | begin wait", id(), __func__); @@ -607,7 +635,7 @@ class ChannelImpl : public ChannelImplWithSignal return false; } - auto p = [this]{ return count_ > 0; }; + auto p = [this]{ return count_ > 0 || closed_; }; RS_DBG(dbg_channel, "channel(void)=%ld | %s | begin wait", id(), __func__); @@ -621,7 +649,7 @@ class ChannelImpl : public ChannelImplWithSignal if (status == std::cv_status::timeout) return false; - if (closed_) + if (count_ <= 0) return false; --count_; diff --git a/test/gtest_unit/channel.cpp b/test/gtest_unit/channel.cpp index c9ea9251..1bf46778 100644 --- a/test/gtest_unit/channel.cpp +++ b/test/gtest_unit/channel.cpp @@ -4,7 +4,8 @@ #include #include #include -//#define OPEN_ROUTINE_SYNC_DEBUG 1 +#include +#define OPEN_ROUTINE_SYNC_DEBUG 1 #include "coroutine.h" #include "gtest_exit.h" using namespace std::chrono; @@ -22,8 +23,8 @@ using namespace co; TEST(Channel, capacity0) { - // co_opt.debug = dbg_channel | dbg_rutex | dbg_mutex; - co_opt.debug_output = fopen("a.log", "w"); +// co_opt.debug = dbg_channel | dbg_rutex | dbg_mutex; +// co_opt.debug_output = fopen("a.log", "w"); co_chan ch; EXPECT_TRUE(ch.empty()); @@ -660,3 +661,249 @@ TEST(Channel, capacity0Timed) delete[] p; } } + +TEST(Channel, read_waiting_close) +{ +// co_opt.debug = dbg_channel | dbg_rutex | dbg_mutex | dbg_cond_v; +// co_opt.debug_output = fopen("a.log", "w"); + + { + co_chan ch(3); + + go [=] { + int v = 0; + bool b = ch.pop(v); + EXPECT_FALSE(b); + }; + + usleep(100 * 1000); + + ch.close(); + + WaitUntilNoTask(); + + int v = 0; + bool b = ch.pop(v); + EXPECT_FALSE(b); + } + + { + co_chan ch(3); + + ch.push(1); + ch.push(2); + ch.push(3); + + go [=] { + int v = 0; + bool b = ch.push(v); + EXPECT_FALSE(b); + }; + + usleep(100 * 1000); + + ch.close(); + + WaitUntilNoTask(); + + int v = 0; + bool b = ch.push(v); + EXPECT_FALSE(b); + } + + { + co_chan ch; + + go [=] { + int v = 0; + bool b = ch.pop(v); + EXPECT_FALSE(b); + }; + + usleep(100 * 1000); + + ch.close(); + + WaitUntilNoTask(); + + int v = 0; + bool b = ch.pop(v); + EXPECT_FALSE(b); + } + + { + co_chan ch(3); + + go [=] { + bool b = ch.pop(nullptr); + EXPECT_FALSE(b); + }; + + usleep(100 * 1000); + + ch.close(); + + WaitUntilNoTask(); + + bool b = ch.pop(nullptr); + EXPECT_FALSE(b); + } + + { + co_chan ch; + + go [=] { + bool b = ch.pop(nullptr); + EXPECT_FALSE(b); + }; + + usleep(100 * 1000); + + ch.close(); + + WaitUntilNoTask(); + + bool b = ch.pop(nullptr); + EXPECT_FALSE(b); + } +} + +TEST(Channel, read_after_close) +{ + { + co_chan ch(3); + + go [=] { + for (int i = 0; i < 3; ++i) { + GTimer t; + int v = 0; + bool ok = ch.pop(v); + EXPECT_TRUE(ok); + EXPECT_EQ(v, i + 1); + } + + int v = 0; + bool b = ch.pop(v); + EXPECT_FALSE(b); + }; + + usleep(100 * 1000); + + ch.push(1); + ch.push(2); + ch.push(3); + ch.close(); + + WaitUntilNoTask(); + + int v = 0; + bool b = ch.pop(v); + EXPECT_FALSE(b); + } + + { + co_chan ch(3); + + ch.push(1); + ch.push(2); + ch.push(3); + + go [=] { + int v = 0; + bool b = ch.push(v); + EXPECT_FALSE(b); + }; + + usleep(100 * 1000); + + int v; + ch.pop(v); + ch.pop(v); + ch.pop(v); + ch.close(); + + WaitUntilNoTask(); + } + + { + co_chan ch(3); + + ch.push(1); + ch.push(2); + ch.push(3); + ch.close(); + + int v; + bool b; + b = ch.pop(v); + EXPECT_TRUE(b); + EXPECT_EQ(v, 1); + + b = ch.pop(v); + EXPECT_TRUE(b); + EXPECT_EQ(v, 2); + + b = ch.pop(v); + EXPECT_TRUE(b); + EXPECT_EQ(v, 3); + + b = ch.pop(v); + EXPECT_FALSE(b); + } + + { + co_chan ch(3); + + ch.push(nullptr); + ch.push(nullptr); + ch.push(nullptr); + ch.close(); + + bool b; + b = ch.pop(nullptr); + EXPECT_TRUE(b); + + b = ch.pop(nullptr); + EXPECT_TRUE(b); + + b = ch.pop(nullptr); + EXPECT_TRUE(b); + + b = ch.pop(nullptr); + EXPECT_FALSE(b); + } + + { + co_chan ch; + + go [=] { + int v = 0; + bool b = ch.pop(v); + EXPECT_TRUE(b); + EXPECT_EQ(v, 1); + }; + + usleep(100 * 1000); + + ch.push(1); + ch.close(); + + WaitUntilNoTask(); + } + + { + co_chan ch; + + go [=] { + bool b = ch.pop(nullptr); + EXPECT_TRUE(b); + }; + + usleep(100 * 1000); + + ch.push(nullptr); + ch.close(); + + WaitUntilNoTask(); + } +}