From 7d24eccd3088076a8fb571b7426975283aaea30c Mon Sep 17 00:00:00 2001 From: guonaihong Date: Fri, 12 Jan 2024 00:59:26 +0800 Subject: [PATCH 1/3] =?UTF-8?q?=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- conn_unix.go | 6 +++++- event_loop.go | 2 +- multi_event_loops.go | 9 +++++---- multi_event_loops_option.go | 11 +++++++++++ stat.go | 15 ++++++++++++++- task.go | 31 +------------------------------ task_business_go.go | 4 ++-- task_window.go | 8 ++++++-- 8 files changed, 45 insertions(+), 41 deletions(-) diff --git a/conn_unix.go b/conn_unix.go index cba7c2e..cf431ed 100644 --- a/conn_unix.go +++ b/conn_unix.go @@ -130,9 +130,13 @@ func (c *Conn) closeWithLock(err error) { return } - if c.Config.runInGoStrategy == taskStrategyStream { + switch c.Config.runInGoStrategy { + case taskStrategyBind: + c.currBindGo.subBinConnCount() + case taskStrategyStream: c.taskStream.close() } + c.closeInner(err) c.mu.Unlock() diff --git a/event_loop.go b/event_loop.go index 354615a..c79eff4 100644 --- a/event_loop.go +++ b/event_loop.go @@ -46,7 +46,7 @@ func CreateEventLoop(setSize int, flag evFlag, parent *MultiEventLoop) (e *Event parent: parent, } e.localTask.taskConfig = e.parent.globalTask.taskConfig - // e.localTask.initWithNoMutex() + e.localTask.init() err = e.apiCreate(flag) return e, err diff --git a/multi_event_loops.go b/multi_event_loops.go index 9d107e7..0087ce9 100644 --- a/multi_event_loops.go +++ b/multi_event_loops.go @@ -37,9 +37,10 @@ var ( defTaskMin = 50 defTaskMax = 30000 defTaskInitCount = 1000 - defNumLoops = runtime.NumCPU() / 4 + defNumLoops = runtime.NumCPU() ) +// 这个函数会被调用两次 func (m *MultiEventLoop) initDefaultSetting() { m.level = slog.LevelError // 默认打印error级别的日志 if m.numLoops == 0 { @@ -51,19 +52,19 @@ func (m *MultiEventLoop) initDefaultSetting() { } if m.globalTask.min == 0 { - m.globalTask.min = max(defTaskMin/(m.numLoops+1), 1) + m.globalTask.min = defTaskMin } else { m.globalTask.min = max(m.globalTask.min/(m.numLoops+1), 1) } if m.globalTask.max == 0 { - m.globalTask.max = max(defTaskMax/(m.numLoops+1), 1) + m.globalTask.max = defTaskMax } else { m.globalTask.max = max(m.globalTask.max/(m.numLoops+1), 1) } if m.globalTask.initCount == 0 { - m.globalTask.initCount = max(defTaskInitCount/(m.numLoops+1), 1) + m.globalTask.initCount = defTaskInitCount } else { m.globalTask.initCount = max(m.globalTask.initCount/(m.numLoops+1), 1) } diff --git a/multi_event_loops_option.go b/multi_event_loops_option.go index 3edb30c..8562488 100644 --- a/multi_event_loops_option.go +++ b/multi_event_loops_option.go @@ -30,6 +30,17 @@ func WithEventLoops(num int) EvOption { // max: 最大协程数 func WithBusinessGoNum(initCount, min, max int) EvOption { return func(e *MultiEventLoop) { + if initCount <= 0 { + initCount = defTaskInitCount + } + + if min <= 0 { + min = defTaskMin + } + + if max <= 0 { + max = defTaskMax + } e.globalTask.initCount = initCount e.globalTask.min = min e.globalTask.max = max diff --git a/stat.go b/stat.go index 18a7393..986c5fd 100644 --- a/stat.go +++ b/stat.go @@ -33,23 +33,36 @@ type stat struct { // 对外接口,查询当前业务协程池个数 func (m *MultiEventLoop) GetCurGoNum() (total int) { + // 全局go程数 total += int(m.globalTask.getCurGo()) for _, v := range m.loops { + // 本地任务数 total += int(v.localTask.getCurGo()) } return } +// 对外接口,查询业务协程池-全局池运行的当前业务数 +func (m *MultiEventLoop) GetGlobalCurGoNum() int { + return int(m.globalTask.getCurGo()) +} + // 对外接口,查询业务协程池运行的当前业务数 func (m *MultiEventLoop) GetCurTaskNum() (total int64) { - + /// 全局任务数 total += m.globalTask.getCurTask() for _, v := range m.loops { + // 本地任务数 total += v.localTask.getCurTask() } return } +// 对外接口,查询业务协程池-全局池运行的当前业务数 +func (m *MultiEventLoop) GetGlobalTaskNum() int { + return int(m.globalTask.getCurTask()) +} + // 对外接口,查询移动字节数 func (m *MultiEventLoop) GetMoveBytesNum() uint64 { return atomic.LoadUint64(&m.moveBytes) diff --git a/task.go b/task.go index bc07037..6d7447d 100644 --- a/task.go +++ b/task.go @@ -99,34 +99,6 @@ func (t *task) consumer() { } } -// func (t *task) highLoad() bool { -// // curGo := atomic.LoadInt64(&t.curGo) -// for i := 1; ; i++ { - -// curTask := atomic.LoadInt64(&t.curTask) - -// if curTask >= int64(t.curGo) { -// return true -// } - -// // 这里的判断条件不准确,因为curGo是表示go程多少,不能表示任务多少, 比如1w上go程,一个任务也不跑 -// // if curGo := atomic.LoadInt64(&t.curGo); curGo > int64(t.max) { -// // return true -// // } - -// if need, _ := t.needGrow(); !need { -// return false -// } - -// curGo := atomic.LoadInt64(&t.curGo) -// maxGo := int64(t.max) -// need := min(2*i, max(0, int(maxGo-curGo))) -// if need > 0 { -// t.addGoNum(need) -// } -// } -// } - // 随机获取一个go程 func (t *task) randomGo() *businessGo { var currBusinessGo *businessGo @@ -137,13 +109,12 @@ func (t *task) randomGo() *businessGo { return currBusinessGo } -// 新增任务, 如果任务队列满了, 新增go程, 这可能会导致协程数超过最大值, 为了防止死锁,还是需要新增业务go程 -// 在io线程里面会判断go程池是否高负载,如果是高负载,会取消read的任务, 放到wbuf里面, 延后再处理 func (t *task) addTask(c *Conn, ts taskStrategy, f func() bool) error { if ts == taskStrategyBind { if c.currBindGo == nil { c.currBindGo = t.randomGo() + c.currBindGo.addBinConnCount() } currChan := c.currBindGo.taskChan // 如果任务未满,直接放入任务队列 diff --git a/task_business_go.go b/task_business_go.go index b18d1b6..7bb0e25 100644 --- a/task_business_go.go +++ b/task_business_go.go @@ -22,12 +22,12 @@ type businessGo struct { } // 新增绑定的conn数 -func (b *businessGo) addBinConnCount(f func() bool) { +func (b *businessGo) addBinConnCount() { atomic.AddInt64(&b.bindConnCount, 1) } // 减少绑定的conn数 -func (b *businessGo) subBinConnCount(f func() bool) { +func (b *businessGo) subBinConnCount() { atomic.AddInt64(&b.bindConnCount, -1) } diff --git a/task_window.go b/task_window.go index c268af3..9bcece0 100644 --- a/task_window.go +++ b/task_window.go @@ -13,6 +13,8 @@ // limitations under the License. package greatws +import "sync/atomic" + // 滑动窗口记录历史go程数 type windows struct { // 历史go程数 @@ -30,11 +32,13 @@ func (w *windows) add(goNum int64) { return } - w.sum += goNum - w.historyGo[w.w] + needAdd := goNum - w.historyGo[w.w] + + atomic.AddInt64(&w.sum, needAdd) w.historyGo[w.w] = goNum w.w = (w.w + 1) % int64(len(w.historyGo)) } func (w *windows) avg() float64 { - return float64(w.sum) / float64(len(w.historyGo)) + return float64(atomic.LoadInt64(&w.sum)) / float64(len(w.historyGo)) } From c40889edc5b452156f08103a55b7d6726b887706 Mon Sep 17 00:00:00 2001 From: guonaihong Date: Fri, 12 Jan 2024 21:40:25 +0800 Subject: [PATCH 2/3] =?UTF-8?q?=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- autobahn/autobahn-server.go | 1 + conn.go | 2 +- conn_unix.go | 13 +++--- task.go | 84 ++++++++++++++++++++++++++++--------- task_business_go.go | 5 +++ 5 files changed, 79 insertions(+), 26 deletions(-) diff --git a/autobahn/autobahn-server.go b/autobahn/autobahn-server.go index f2e1c10..268686b 100644 --- a/autobahn/autobahn-server.go +++ b/autobahn/autobahn-server.go @@ -123,6 +123,7 @@ func (h *handler) echoRunStream(w http.ResponseWriter, r *http.Request) { } _ = c } + func main() { flag.Parse() diff --git a/conn.go b/conn.go index e97cd66..4598447 100644 --- a/conn.go +++ b/conn.go @@ -85,7 +85,7 @@ func (c *Conn) addTask(ts taskStrategy, f func() bool) { return } if ts == taskStrategyStream { - c.taskStream.addTask(ts, f) + c.streamGo.addTask(ts, f) return } diff --git a/conn_unix.go b/conn_unix.go index cf431ed..1ad612e 100644 --- a/conn_unix.go +++ b/conn_unix.go @@ -84,8 +84,8 @@ type Conn struct { closed int32 // 是否关闭 closeOnce sync.Once parent *EventLoop - currBindGo *businessGo - taskStream taskStream + currBindGo *businessGo // 绑定模式下,当前绑定的go程 + streamGo taskStream // stream模式下,当前绑定的go程 } func newConn(fd int64, client bool, conf *Config) *Conn { @@ -102,7 +102,7 @@ func newConn(fd int64, client bool, conf *Config) *Conn { } if conf.runInGoStrategy == taskStrategyStream { - c.taskStream.init() + c.streamGo.init() } return c } @@ -132,9 +132,12 @@ func (c *Conn) closeWithLock(err error) { switch c.Config.runInGoStrategy { case taskStrategyBind: - c.currBindGo.subBinConnCount() + if c.currBindGo != nil { + c.currBindGo.subBinConnCount() + } case taskStrategyStream: - c.taskStream.close() + c.streamGo.close() + } c.closeInner(err) diff --git a/task.go b/task.go index 6d7447d..0ef3d7a 100644 --- a/task.go +++ b/task.go @@ -18,6 +18,8 @@ import ( "sync" "sync/atomic" "time" + + "github.com/antlabs/gstl/cmp" ) // 运行task的策略 @@ -26,11 +28,11 @@ import ( type taskStrategy int const ( - // 随机。默认 + // 送入全局队列,存在的意义主要是用于测试 taskStrategyRandom taskStrategy = iota - // 绑定映射 + // 绑定映射, 从一个go程中取一个conn绑定,后面的请求都会在这个go程中处理 taskStrategyBind - // 流式映射 + // 流式映射,一个conntion绑定一个go程(独占) taskStrategyStream ) @@ -51,12 +53,19 @@ type task struct { curGo int64 // 当前运行协程数 curTask int64 // 当前运行任务数 - allBusinessGo sync.Map // key是[*businessGo, struct{}], 目前先这样。后面再优化下 + mu sync.Mutex + allBusinessGo []*businessGo + id uint32 +} + +func (t *task) nextID() uint32 { + return (atomic.AddUint32(&t.id, 1) - 1) % uint32(len(t.allBusinessGo)) } // 初始化 func (t *task) initInner() { t.c = make(chan func() bool) + t.allBusinessGo = make([]*businessGo, t.initCount) t.windows.init() go t.manageGo() go t.runConsumerLoop() @@ -66,19 +75,29 @@ func (t *task) init() { t.initInner() } -func (t *task) getCurGo() int64 { - return atomic.LoadInt64(&t.curGo) -} +// 收缩go程的slice,直接迁移完。TODO:分摊优化 +func (t *task) sharkAllBusinessGo() { + if len(t.allBusinessGo)/2 > int(t.curGo) { + needSize := cmp.Max(len(t.allBusinessGo)/2, t.min) + newAllBusinessGo := make([]*businessGo, 0, needSize) + for _, v := range t.allBusinessGo { + if v == nil || v.isClose() { + continue + } -func (t *task) getCurTask() int64 { - return atomic.LoadInt64(&t.curTask) + newAllBusinessGo = append(newAllBusinessGo, v) + } + t.allBusinessGo = newAllBusinessGo + } } // 消费者循环 func (t *task) consumer() { defer atomic.AddInt64(&t.curGo, -1) currBusinessGo := newBusinessGo() - t.allBusinessGo.Store(currBusinessGo, struct{}{}) + t.mu.Lock() + t.allBusinessGo = append(t.allBusinessGo, currBusinessGo) + t.mu.Unlock() var f func() bool for { @@ -86,8 +105,13 @@ func (t *task) consumer() { case f = <-t.c: case f = <-currBusinessGo.taskChan: } + atomic.AddInt64(&t.curTask, 1) if b := f(); b { + t.mu.Lock() + t.sharkAllBusinessGo() + t.mu.Unlock() + atomic.AddInt64(&t.curTask, -1) if !currBusinessGo.canKill() { continue @@ -99,22 +123,34 @@ func (t *task) consumer() { } } -// 随机获取一个go程 -func (t *task) randomGo() *businessGo { - var currBusinessGo *businessGo - t.allBusinessGo.Range(func(key, value interface{}) bool { - currBusinessGo = key.(*businessGo) - return false - }) - return currBusinessGo +// 获取一个go程,如果是slice的话,轮询获取 +func (t *task) getGo() *businessGo { + t.mu.Lock() + defer t.mu.Unlock() + for i := 0; i < len(t.allBusinessGo); i++ { + k := t.nextID() + v := t.allBusinessGo[k] + if v == nil { + continue + } + + if v.isClose() { + t.allBusinessGo[k] = nil + continue + } + v.addBinConnCount() + return v + } + + panic("businessgo is nil ") + return nil } func (t *task) addTask(c *Conn, ts taskStrategy, f func() bool) error { if ts == taskStrategyBind { if c.currBindGo == nil { - c.currBindGo = t.randomGo() - c.currBindGo.addBinConnCount() + c.currBindGo = t.getGo() } currChan := c.currBindGo.taskChan // 如果任务未满,直接放入任务队列 @@ -227,3 +263,11 @@ func (t *task) runConsumerLoop() { go t.consumer() } } + +func (t *task) getCurGo() int64 { + return atomic.LoadInt64(&t.curGo) +} + +func (t *task) getCurTask() int64 { + return atomic.LoadInt64(&t.curTask) +} diff --git a/task_business_go.go b/task_business_go.go index 7bb0e25..d53e209 100644 --- a/task_business_go.go +++ b/task_business_go.go @@ -19,6 +19,11 @@ type businessGo struct { taskChan chan func() bool // 被多少conn绑定 bindConnCount int64 + closed uint32 +} + +func (b *businessGo) isClose() bool { + return atomic.LoadUint32(&b.closed) == 1 } // 新增绑定的conn数 From 545e97f8619e22f4ade468304df38691b7d60961 Mon Sep 17 00:00:00 2001 From: guonaihong Date: Sun, 14 Jan 2024 17:55:17 +0800 Subject: [PATCH 3/3] =?UTF-8?q?=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config.go | 18 ++++++++---------- event_loop.go | 2 +- multi_event_loops.go | 5 +++++ multi_event_loops_option.go | 7 +++++++ task.go | 26 +++++++++++++++++++++----- task_business_go.go | 4 ++-- 6 files changed, 44 insertions(+), 18 deletions(-) diff --git a/config.go b/config.go index 39e5b85..63ad1db 100644 --- a/config.go +++ b/config.go @@ -30,15 +30,14 @@ type Config struct { disableBufioClearHack bool // 关闭bufio的clear hack优化 utf8Check func([]byte) bool // utf8检查 readTimeout time.Duration - windowsMultipleTimesPayloadSize float32 // 设置几倍(1024+14)的payload大小 - // parseMode parseMode // 解析模式, TODO - maxDelayWriteNum int32 // 最大延迟包的个数, 默认值为10 - delayWriteInitBufferSize int32 // 延迟写入的初始缓冲区大小, 默认值是8k - maxDelayWriteDuration time.Duration // 最大延迟时间, 默认值是10ms - subProtocols []string // 设置支持的子协议 - multiEventLoop *MultiEventLoop - callbackInEventLoop bool // 在event loop中运行websocket OnOpen, OnMessage, OnClose 回调函数 - runInGoStrategy taskStrategy + windowsMultipleTimesPayloadSize float32 // 设置几倍(1024+14)的payload大小 + maxDelayWriteNum int32 // 最大延迟包的个数, 默认值为10 + delayWriteInitBufferSize int32 // 延迟写入的初始缓冲区大小, 默认值是8k + maxDelayWriteDuration time.Duration // 最大延迟时间, 默认值是10ms + subProtocols []string // 设置支持的子协议 + multiEventLoop *MultiEventLoop + callbackInEventLoop bool // 在event loop中运行websocket OnOpen, OnMessage, OnClose 回调函数 + runInGoStrategy taskStrategy } func (c *Config) useIoUring() bool { @@ -58,7 +57,6 @@ func (c *Config) defaultSetting() { c.maxDelayWriteDuration = 10 * time.Millisecond c.runInGoStrategy = taskStrategyBind c.tcpNoDelay = true - // c.parseMode = ParseModeWindows // 对于text消息,默认不检查text是utf8字符 c.utf8Check = func(b []byte) bool { return true } } diff --git a/event_loop.go b/event_loop.go index c79eff4..cab4fa3 100644 --- a/event_loop.go +++ b/event_loop.go @@ -46,7 +46,7 @@ func CreateEventLoop(setSize int, flag evFlag, parent *MultiEventLoop) (e *Event parent: parent, } e.localTask.taskConfig = e.parent.globalTask.taskConfig - + e.localTask.taskMode = e.parent.globalTask.taskMode e.localTask.init() err = e.apiCreate(flag) return e, err diff --git a/multi_event_loops.go b/multi_event_loops.go index 0087ce9..f5cb0c3 100644 --- a/multi_event_loops.go +++ b/multi_event_loops.go @@ -30,6 +30,7 @@ type MultiEventLoop struct { level slog.Level stat // 统计信息 *slog.Logger + taskMode taskMode } var ( @@ -93,6 +94,10 @@ func NewMultiEventLoop(opts ...EvOption) (e *MultiEventLoop, err error) { o(m) } m.initDefaultSetting() + + // 设置任务池模式(tps, 或者流量模式) + m.globalTask.taskMode = m.taskMode + m.globalTask.init() m.loops = make([]*EventLoop, m.numLoops) diff --git a/multi_event_loops_option.go b/multi_event_loops_option.go index 8562488..25cca38 100644 --- a/multi_event_loops_option.go +++ b/multi_event_loops_option.go @@ -47,6 +47,13 @@ func WithBusinessGoNum(initCount, min, max int) EvOption { } } +// 设置business go程池 对流量压测友好的模式 +func WithBusinessGoTrafficMode() EvOption { + return func(e *MultiEventLoop) { + e.taskMode = trafficMode + } +} + // 设置日志级别 func WithLogLevel(level slog.Level) EvOption { return func(e *MultiEventLoop) { diff --git a/task.go b/task.go index 0ef3d7a..b0d29d5 100644 --- a/task.go +++ b/task.go @@ -15,6 +15,7 @@ package greatws import ( "errors" + "runtime" "sync" "sync/atomic" "time" @@ -46,6 +47,16 @@ type taskConfig struct { max int // 最大协程数 } +// task 模式 +// 1. tps模式 +// 2. 流量模式 +type taskMode int + +const ( + tpsMode taskMode = iota + trafficMode +) + type task struct { c chan func() bool windows windows // 滑动窗口计数,用于判断是否需要新增go程 @@ -53,9 +64,11 @@ type task struct { curGo int64 // 当前运行协程数 curTask int64 // 当前运行任务数 - mu sync.Mutex - allBusinessGo []*businessGo - id uint32 + mu sync.Mutex + allBusinessGo []*businessGo + id uint32 + taskMode taskMode + businessChanNum int } func (t *task) nextID() uint32 { @@ -72,6 +85,10 @@ func (t *task) initInner() { } func (t *task) init() { + t.businessChanNum = runtime.NumCPU() + if t.taskMode == trafficMode { + t.businessChanNum = 1024 + } t.initInner() } @@ -94,7 +111,7 @@ func (t *task) sharkAllBusinessGo() { // 消费者循环 func (t *task) consumer() { defer atomic.AddInt64(&t.curGo, -1) - currBusinessGo := newBusinessGo() + currBusinessGo := newBusinessGo(t.businessChanNum) t.mu.Lock() t.allBusinessGo = append(t.allBusinessGo, currBusinessGo) t.mu.Unlock() @@ -143,7 +160,6 @@ func (t *task) getGo() *businessGo { } panic("businessgo is nil ") - return nil } func (t *task) addTask(c *Conn, ts taskStrategy, f func() bool) error { diff --git a/task_business_go.go b/task_business_go.go index d53e209..debad80 100644 --- a/task_business_go.go +++ b/task_business_go.go @@ -45,8 +45,8 @@ func (b *businessGo) canKill() bool { return curConn == 0 } -func newBusinessGo() *businessGo { +func newBusinessGo(num int) *businessGo { return &businessGo{ - taskChan: make(chan func() bool, 3), + taskChan: make(chan func() bool, num), } }