From b69861fb3957bfd00f2c2c4a3b23290f0a4fb6ef Mon Sep 17 00:00:00 2001 From: guonaihong Date: Tue, 15 Aug 2023 18:54:50 +0800 Subject: [PATCH] fixbug --- min_heap.go | 14 +++++------ min_heap_node.go | 20 +++++++++------- min_heap_node_test.go | 55 +++++++++++++++++++++++++++++++++++++++++++ min_heap_test.go | 32 ++++++++----------------- 4 files changed, 82 insertions(+), 39 deletions(-) create mode 100644 min_heap_node_test.go diff --git a/min_heap.go b/min_heap.go index 9d42ffc..1a8a9b3 100644 --- a/min_heap.go +++ b/min_heap.go @@ -18,7 +18,7 @@ type minHeap struct { ctx context.Context cancel context.CancelFunc wait sync.WaitGroup - runCount uint32 //测试时使用 + runCount uint32 // 测试时使用 } // 一次性定时器 @@ -60,14 +60,13 @@ func (m *minHeap) addCallback(expire time.Duration, n Next, callback func(), isS node.absExpire = n.Next(time.Now()) } - heap.Push(&m.minHeaps, node) + heap.Push(&m.minHeaps, &node) select { case m.chAdd <- struct{}{}: default: } return &node - } func (m *minHeap) removeTimeNode(node *minHeapNode) { @@ -84,7 +83,6 @@ func (m *minHeap) removeTimeNode(node *minHeapNode) { // 运行 // 为了避免空转cpu, 会等待一个chan, 只要AfterFunc或者ScheduleFunc被调用就会往这个chan里面写值 func (m *minHeap) Run() { - timeout := time.Hour tm := time.NewTimer(timeout) for { @@ -100,7 +98,7 @@ func (m *minHeap) Run() { } for { - first := &m.minHeaps[0] + first := m.minHeaps[0] // 时间未到直接过滤掉 if !now.After(first.absExpire) { @@ -112,8 +110,9 @@ func (m *minHeap) Run() { first.absExpire = first.Next(now) heap.Fix(&m.minHeaps, first.index) } else { - m.minHeaps.Pop() + heap.Pop(&m.minHeaps) } + atomic.AddUint32(&m.runCount, 1) go callback() if m.minHeaps.Len() == 0 { @@ -123,7 +122,7 @@ func (m *minHeap) Run() { } } - first := &m.minHeaps[0] + first := m.minHeaps[0] if time.Now().Before(first.absExpire) { to := time.Duration(math.Abs(float64(time.Since(m.minHeaps[0].absExpire)))) tm.Reset(to) @@ -146,7 +145,6 @@ func (m *minHeap) Run() { return } next: - atomic.AddUint32(&m.runCount, 1) } } diff --git a/min_heap_node.go b/min_heap_node.go index 597bc46..3a434c3 100644 --- a/min_heap_node.go +++ b/min_heap_node.go @@ -5,12 +5,12 @@ import ( ) type minHeapNode struct { - callback func() //用户的callback - absExpire time.Time //绝对时间 - userExpire time.Duration //过期时间段 - next Next //自定义下个触发的时间点 - isSchedule bool //是否是周期性任务 - index int //在min heap中的索引,方便删除或者重新推入堆中 + callback func() // 用户的callback + absExpire time.Time // 绝对时间 + userExpire time.Duration // 过期时间段 + next Next // 自定义下个触发的时间点 + isSchedule bool // 是否是周期性任务 + index int // 在min heap中的索引,方便删除或者重新推入堆中 root *minHeap } @@ -25,10 +25,12 @@ func (m *minHeapNode) Next(now time.Time) time.Time { return now.Add(m.userExpire) } -type minHeaps []minHeapNode +type minHeaps []*minHeapNode + +func (m minHeaps) Len() int { return len(m) } -func (m minHeaps) Len() int { return len(m) } func (m minHeaps) Less(i, j int) bool { return m[i].absExpire.Before(m[j].absExpire) } + func (m minHeaps) Swap(i, j int) { m[i], m[j] = m[j], m[i] m[i].index = i @@ -38,7 +40,7 @@ func (m minHeaps) Swap(i, j int) { func (m *minHeaps) Push(x any) { // Push and Pop use pointer receivers because they modify the slice's length, // not just its contents. - *m = append(*m, x.(minHeapNode)) + *m = append(*m, x.(*minHeapNode)) lastIndex := len(*m) - 1 (*m)[lastIndex].index = lastIndex } diff --git a/min_heap_node_test.go b/min_heap_node_test.go new file mode 100644 index 0000000..2776c0b --- /dev/null +++ b/min_heap_node_test.go @@ -0,0 +1,55 @@ +package timer + +import ( + "container/heap" + "testing" + "time" +) + +func Test_MinHeap(t *testing.T) { + t.Run("", func(t *testing.T) { + var mh minHeaps + now := time.Now() + n1 := minHeapNode{ + absExpire: now.Add(time.Second), + userExpire: 1 * time.Second, + } + + n2 := minHeapNode{ + absExpire: now.Add(2 * time.Second), + userExpire: 2 * time.Second, + } + + n3 := minHeapNode{ + absExpire: now.Add(3 * time.Second), + userExpire: 3 * time.Second, + } + + n6 := minHeapNode{ + absExpire: now.Add(6 * time.Second), + userExpire: 6 * time.Second, + } + n5 := minHeapNode{ + absExpire: now.Add(5 * time.Second), + userExpire: 5 * time.Second, + } + n4 := minHeapNode{ + absExpire: now.Add(4 * time.Second), + userExpire: 4 * time.Second, + } + mh.Push(&n1) + mh.Push(&n2) + mh.Push(&n3) + mh.Push(&n6) + mh.Push(&n5) + mh.Push(&n4) + + for i := 1; len(mh) > 0; i++ { + v := heap.Pop(&mh).(*minHeapNode) + + if v.userExpire != time.Duration(i)*time.Second { + t.Errorf("index(%d) v.userExpire(%v) != %v", i, v.userExpire, time.Duration(i)*time.Second) + } + } + }) +} diff --git a/min_heap_test.go b/min_heap_test.go index fffe566..1d37960 100644 --- a/min_heap_test.go +++ b/min_heap_test.go @@ -10,9 +10,7 @@ import ( // 测试AfterFunc有没有运行以及时间间隔可对 func Test_MinHeap_AfterFunc_Run(t *testing.T) { - t.Run("1ms", func(t *testing.T) { - tm := NewTimer(WithMinHeap()) now := time.Now() go tm.Run() @@ -37,7 +35,6 @@ func Test_MinHeap_AfterFunc_Run(t *testing.T) { } } assert.Equal(t, atomic.LoadInt32(&count), int32(2)) - }) t.Run("10ms", func(t *testing.T) { @@ -64,10 +61,9 @@ func Test_MinHeap_AfterFunc_Run(t *testing.T) { if tv < left || tv > right { t.Errorf("index(%d) (%v)tc < %v || tc > %v", cnt, tv, left, right) } - //cnt++ + // cnt++ } assert.Equal(t, atomic.LoadInt32(&count), int32(2)) - }) t.Run("90ms", func(t *testing.T) { @@ -75,20 +71,16 @@ func Test_MinHeap_AfterFunc_Run(t *testing.T) { go tm.Run() count := int32(0) tm.AfterFunc(time.Millisecond*90, func() { atomic.AddInt32(&count, 1) }) - tm.AfterFunc(time.Millisecond*90, func() { atomic.AddInt32(&count, 1) }) + tm.AfterFunc(time.Millisecond*90, func() { atomic.AddInt32(&count, 2) }) time.Sleep(time.Millisecond * 180) - assert.Equal(t, atomic.LoadInt32(&count), int32(2)) - + assert.Equal(t, atomic.LoadInt32(&count), int32(3)) }) - } // 测试Schedule 运行的周期可对 func Test_MinHeap_ScheduleFunc_Run(t *testing.T) { - t.Run("1ms", func(t *testing.T) { - tm := NewTimer(WithMinHeap()) go tm.Run() count := int32(0) @@ -108,7 +100,6 @@ func Test_MinHeap_ScheduleFunc_Run(t *testing.T) { time.Sleep(time.Millisecond * 5) assert.Equal(t, atomic.LoadInt32(&count), int32(2)) - }) t.Run("10ms", func(t *testing.T) { @@ -121,7 +112,7 @@ func Test_MinHeap_ScheduleFunc_Run(t *testing.T) { node := tm.ScheduleFunc(time.Millisecond*10, func() { atomic.AddInt32(&count, 1) tc <- time.Since(now) - if atomic.LoadInt32(&count) == 2 { + if atomic.LoadInt32(&count) >= 2 { c <- true } }) @@ -143,7 +134,6 @@ func Test_MinHeap_ScheduleFunc_Run(t *testing.T) { cnt++ } assert.Equal(t, atomic.LoadInt32(&count), int32(2)) - }) t.Run("30ms", func(t *testing.T) { @@ -159,14 +149,12 @@ func Test_MinHeap_ScheduleFunc_Run(t *testing.T) { } }) go func() { - <-c node.Stop() }() time.Sleep(time.Millisecond * 70) assert.Equal(t, atomic.LoadInt32(&count), int32(2)) - }) } @@ -187,34 +175,35 @@ func Test_Run_Stop(t *testing.T) { } type curstomTest struct { - count int + count int32 } func (c *curstomTest) Next(now time.Time) (rv time.Time) { rv = now.Add(time.Duration(c.count) * time.Millisecond * 10) - c.count++ + atomic.AddInt32(&c.count, 1) return } // 验证自定义函数的运行间隔时间 func Test_CustomFunc(t *testing.T) { t.Run("custom", func(t *testing.T) { - tm := NewTimer(WithMinHeap()) mh := tm.(*minHeap) tc := make(chan time.Duration, 2) now := time.Now() count := uint32(1) + stop := make(chan bool, 1) node := tm.CustomFunc(&curstomTest{count: 1}, func() { if atomic.LoadUint32(&count) == 2 { return } atomic.AddUint32(&count, 1) tc <- time.Since(now) + close(stop) }) go func() { - time.Sleep(time.Millisecond * 30) + <-stop node.Stop() tm.Stop() }() @@ -231,7 +220,7 @@ func Test_CustomFunc(t *testing.T) { cnt++ } assert.Equal(t, atomic.LoadUint32(&count), uint32(2)) - assert.Equal(t, mh.runCount, uint32(3)) + assert.Equal(t, mh.runCount, uint32(1)) }) } @@ -246,7 +235,6 @@ func Test_RunCount(t *testing.T) { count := uint32(0) for i := 0; i < max; i++ { - tm.ScheduleFunc(time.Millisecond*10, func() { atomic.AddUint32(&count, 1) })