Skip to content

Commit

Permalink
fixbug
Browse files Browse the repository at this point in the history
  • Loading branch information
guonaihong committed Aug 15, 2023
1 parent 8c3abca commit b69861f
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 39 deletions.
14 changes: 6 additions & 8 deletions min_heap.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type minHeap struct {
ctx context.Context
cancel context.CancelFunc
wait sync.WaitGroup
runCount uint32 //测试时使用
runCount uint32 // 测试时使用
}

// 一次性定时器
Expand Down Expand Up @@ -60,14 +60,13 @@ func (m *minHeap) addCallback(expire time.Duration, n Next, callback func(), isS
node.absExpire = n.Next(time.Now())
}

heap.Push(&m.minHeaps, node)
heap.Push(&m.minHeaps, &node)
select {
case m.chAdd <- struct{}{}:
default:
}

return &node

}

func (m *minHeap) removeTimeNode(node *minHeapNode) {
Expand All @@ -84,7 +83,6 @@ func (m *minHeap) removeTimeNode(node *minHeapNode) {
// 运行
// 为了避免空转cpu, 会等待一个chan, 只要AfterFunc或者ScheduleFunc被调用就会往这个chan里面写值
func (m *minHeap) Run() {

timeout := time.Hour
tm := time.NewTimer(timeout)
for {
Expand All @@ -100,7 +98,7 @@ func (m *minHeap) Run() {
}

for {
first := &m.minHeaps[0]
first := m.minHeaps[0]

// 时间未到直接过滤掉
if !now.After(first.absExpire) {
Expand All @@ -112,8 +110,9 @@ func (m *minHeap) Run() {
first.absExpire = first.Next(now)
heap.Fix(&m.minHeaps, first.index)
} else {
m.minHeaps.Pop()
heap.Pop(&m.minHeaps)
}
atomic.AddUint32(&m.runCount, 1)
go callback()

if m.minHeaps.Len() == 0 {
Expand All @@ -123,7 +122,7 @@ func (m *minHeap) Run() {
}
}

first := &m.minHeaps[0]
first := m.minHeaps[0]
if time.Now().Before(first.absExpire) {
to := time.Duration(math.Abs(float64(time.Since(m.minHeaps[0].absExpire))))
tm.Reset(to)
Expand All @@ -146,7 +145,6 @@ func (m *minHeap) Run() {
return
}
next:
atomic.AddUint32(&m.runCount, 1)
}
}

Expand Down
20 changes: 11 additions & 9 deletions min_heap_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ import (
)

type minHeapNode struct {
callback func() //用户的callback
absExpire time.Time //绝对时间
userExpire time.Duration //过期时间段
next Next //自定义下个触发的时间点
isSchedule bool //是否是周期性任务
index int //在min heap中的索引,方便删除或者重新推入堆中
callback func() // 用户的callback
absExpire time.Time // 绝对时间
userExpire time.Duration // 过期时间段
next Next // 自定义下个触发的时间点
isSchedule bool // 是否是周期性任务
index int // 在min heap中的索引,方便删除或者重新推入堆中
root *minHeap
}

Expand All @@ -25,10 +25,12 @@ func (m *minHeapNode) Next(now time.Time) time.Time {
return now.Add(m.userExpire)
}

type minHeaps []minHeapNode
type minHeaps []*minHeapNode

func (m minHeaps) Len() int { return len(m) }

func (m minHeaps) Len() int { return len(m) }
func (m minHeaps) Less(i, j int) bool { return m[i].absExpire.Before(m[j].absExpire) }

func (m minHeaps) Swap(i, j int) {
m[i], m[j] = m[j], m[i]
m[i].index = i
Expand All @@ -38,7 +40,7 @@ func (m minHeaps) Swap(i, j int) {
func (m *minHeaps) Push(x any) {
// Push and Pop use pointer receivers because they modify the slice's length,
// not just its contents.
*m = append(*m, x.(minHeapNode))
*m = append(*m, x.(*minHeapNode))
lastIndex := len(*m) - 1
(*m)[lastIndex].index = lastIndex
}
Expand Down
55 changes: 55 additions & 0 deletions min_heap_node_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package timer

import (
"container/heap"
"testing"
"time"
)

func Test_MinHeap(t *testing.T) {
t.Run("", func(t *testing.T) {
var mh minHeaps
now := time.Now()
n1 := minHeapNode{
absExpire: now.Add(time.Second),
userExpire: 1 * time.Second,
}

n2 := minHeapNode{
absExpire: now.Add(2 * time.Second),
userExpire: 2 * time.Second,
}

n3 := minHeapNode{
absExpire: now.Add(3 * time.Second),
userExpire: 3 * time.Second,
}

n6 := minHeapNode{
absExpire: now.Add(6 * time.Second),
userExpire: 6 * time.Second,
}
n5 := minHeapNode{
absExpire: now.Add(5 * time.Second),
userExpire: 5 * time.Second,
}
n4 := minHeapNode{
absExpire: now.Add(4 * time.Second),
userExpire: 4 * time.Second,
}
mh.Push(&n1)
mh.Push(&n2)
mh.Push(&n3)
mh.Push(&n6)
mh.Push(&n5)
mh.Push(&n4)

for i := 1; len(mh) > 0; i++ {
v := heap.Pop(&mh).(*minHeapNode)

if v.userExpire != time.Duration(i)*time.Second {
t.Errorf("index(%d) v.userExpire(%v) != %v", i, v.userExpire, time.Duration(i)*time.Second)
}
}
})
}
32 changes: 10 additions & 22 deletions min_heap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@ import (

// 测试AfterFunc有没有运行以及时间间隔可对
func Test_MinHeap_AfterFunc_Run(t *testing.T) {

t.Run("1ms", func(t *testing.T) {

tm := NewTimer(WithMinHeap())
now := time.Now()
go tm.Run()
Expand All @@ -37,7 +35,6 @@ func Test_MinHeap_AfterFunc_Run(t *testing.T) {
}
}
assert.Equal(t, atomic.LoadInt32(&count), int32(2))

})

t.Run("10ms", func(t *testing.T) {
Expand All @@ -64,31 +61,26 @@ func Test_MinHeap_AfterFunc_Run(t *testing.T) {
if tv < left || tv > right {
t.Errorf("index(%d) (%v)tc < %v || tc > %v", cnt, tv, left, right)
}
//cnt++
// cnt++
}
assert.Equal(t, atomic.LoadInt32(&count), int32(2))

})

t.Run("90ms", func(t *testing.T) {
tm := NewTimer(WithMinHeap())
go tm.Run()
count := int32(0)
tm.AfterFunc(time.Millisecond*90, func() { atomic.AddInt32(&count, 1) })
tm.AfterFunc(time.Millisecond*90, func() { atomic.AddInt32(&count, 1) })
tm.AfterFunc(time.Millisecond*90, func() { atomic.AddInt32(&count, 2) })

time.Sleep(time.Millisecond * 180)
assert.Equal(t, atomic.LoadInt32(&count), int32(2))

assert.Equal(t, atomic.LoadInt32(&count), int32(3))
})

}

// 测试Schedule 运行的周期可对
func Test_MinHeap_ScheduleFunc_Run(t *testing.T) {

t.Run("1ms", func(t *testing.T) {

tm := NewTimer(WithMinHeap())
go tm.Run()
count := int32(0)
Expand All @@ -108,7 +100,6 @@ func Test_MinHeap_ScheduleFunc_Run(t *testing.T) {

time.Sleep(time.Millisecond * 5)
assert.Equal(t, atomic.LoadInt32(&count), int32(2))

})

t.Run("10ms", func(t *testing.T) {
Expand All @@ -121,7 +112,7 @@ func Test_MinHeap_ScheduleFunc_Run(t *testing.T) {
node := tm.ScheduleFunc(time.Millisecond*10, func() {
atomic.AddInt32(&count, 1)
tc <- time.Since(now)
if atomic.LoadInt32(&count) == 2 {
if atomic.LoadInt32(&count) >= 2 {
c <- true
}
})
Expand All @@ -143,7 +134,6 @@ func Test_MinHeap_ScheduleFunc_Run(t *testing.T) {
cnt++
}
assert.Equal(t, atomic.LoadInt32(&count), int32(2))

})

t.Run("30ms", func(t *testing.T) {
Expand All @@ -159,14 +149,12 @@ func Test_MinHeap_ScheduleFunc_Run(t *testing.T) {
}
})
go func() {

<-c
node.Stop()
}()

time.Sleep(time.Millisecond * 70)
assert.Equal(t, atomic.LoadInt32(&count), int32(2))

})
}

Expand All @@ -187,34 +175,35 @@ func Test_Run_Stop(t *testing.T) {
}

type curstomTest struct {
count int
count int32
}

func (c *curstomTest) Next(now time.Time) (rv time.Time) {
rv = now.Add(time.Duration(c.count) * time.Millisecond * 10)
c.count++
atomic.AddInt32(&c.count, 1)
return
}

// 验证自定义函数的运行间隔时间
func Test_CustomFunc(t *testing.T) {
t.Run("custom", func(t *testing.T) {

tm := NewTimer(WithMinHeap())
mh := tm.(*minHeap)
tc := make(chan time.Duration, 2)
now := time.Now()
count := uint32(1)
stop := make(chan bool, 1)
node := tm.CustomFunc(&curstomTest{count: 1}, func() {
if atomic.LoadUint32(&count) == 2 {
return
}
atomic.AddUint32(&count, 1)
tc <- time.Since(now)
close(stop)
})

go func() {
time.Sleep(time.Millisecond * 30)
<-stop
node.Stop()
tm.Stop()
}()
Expand All @@ -231,7 +220,7 @@ func Test_CustomFunc(t *testing.T) {
cnt++
}
assert.Equal(t, atomic.LoadUint32(&count), uint32(2))
assert.Equal(t, mh.runCount, uint32(3))
assert.Equal(t, mh.runCount, uint32(1))
})
}

Expand All @@ -246,7 +235,6 @@ func Test_RunCount(t *testing.T) {

count := uint32(0)
for i := 0; i < max; i++ {

tm.ScheduleFunc(time.Millisecond*10, func() {
atomic.AddUint32(&count, 1)
})
Expand Down

0 comments on commit b69861f

Please sign in to comment.