Skip to content

Commit

Permalink
Merge pull request #12 from antlabs/dev
Browse files Browse the repository at this point in the history
优化
  • Loading branch information
guonaihong authored Jan 14, 2024
2 parents 20143a7 + 545e97f commit 78c087c
Show file tree
Hide file tree
Showing 11 changed files with 161 additions and 78 deletions.
1 change: 1 addition & 0 deletions autobahn/autobahn-server.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ func (h *handler) echoRunStream(w http.ResponseWriter, r *http.Request) {
}
_ = c
}

func main() {
flag.Parse()

Expand Down
18 changes: 8 additions & 10 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,14 @@ type Config struct {
disableBufioClearHack bool // 关闭bufio的clear hack优化
utf8Check func([]byte) bool // utf8检查
readTimeout time.Duration
windowsMultipleTimesPayloadSize float32 // 设置几倍(1024+14)的payload大小
// parseMode parseMode // 解析模式, TODO
maxDelayWriteNum int32 // 最大延迟包的个数, 默认值为10
delayWriteInitBufferSize int32 // 延迟写入的初始缓冲区大小, 默认值是8k
maxDelayWriteDuration time.Duration // 最大延迟时间, 默认值是10ms
subProtocols []string // 设置支持的子协议
multiEventLoop *MultiEventLoop
callbackInEventLoop bool // 在event loop中运行websocket OnOpen, OnMessage, OnClose 回调函数
runInGoStrategy taskStrategy
windowsMultipleTimesPayloadSize float32 // 设置几倍(1024+14)的payload大小
maxDelayWriteNum int32 // 最大延迟包的个数, 默认值为10
delayWriteInitBufferSize int32 // 延迟写入的初始缓冲区大小, 默认值是8k
maxDelayWriteDuration time.Duration // 最大延迟时间, 默认值是10ms
subProtocols []string // 设置支持的子协议
multiEventLoop *MultiEventLoop
callbackInEventLoop bool // 在event loop中运行websocket OnOpen, OnMessage, OnClose 回调函数
runInGoStrategy taskStrategy
}

func (c *Config) useIoUring() bool {
Expand All @@ -58,7 +57,6 @@ func (c *Config) defaultSetting() {
c.maxDelayWriteDuration = 10 * time.Millisecond
c.runInGoStrategy = taskStrategyBind
c.tcpNoDelay = true
// c.parseMode = ParseModeWindows
// 对于text消息,默认不检查text是utf8字符
c.utf8Check = func(b []byte) bool { return true }
}
2 changes: 1 addition & 1 deletion conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (c *Conn) addTask(ts taskStrategy, f func() bool) {
return
}
if ts == taskStrategyStream {
c.taskStream.addTask(ts, f)
c.streamGo.addTask(ts, f)
return
}

Expand Down
17 changes: 12 additions & 5 deletions conn_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ type Conn struct {
closed int32 // 是否关闭
closeOnce sync.Once
parent *EventLoop
currBindGo *businessGo
taskStream taskStream
currBindGo *businessGo // 绑定模式下,当前绑定的go程
streamGo taskStream // stream模式下,当前绑定的go程
}

func newConn(fd int64, client bool, conf *Config) *Conn {
Expand All @@ -102,7 +102,7 @@ func newConn(fd int64, client bool, conf *Config) *Conn {
}

if conf.runInGoStrategy == taskStrategyStream {
c.taskStream.init()
c.streamGo.init()
}
return c
}
Expand Down Expand Up @@ -130,9 +130,16 @@ func (c *Conn) closeWithLock(err error) {
return
}

if c.Config.runInGoStrategy == taskStrategyStream {
c.taskStream.close()
switch c.Config.runInGoStrategy {
case taskStrategyBind:
if c.currBindGo != nil {
c.currBindGo.subBinConnCount()
}
case taskStrategyStream:
c.streamGo.close()

}

c.closeInner(err)

c.mu.Unlock()
Expand Down
2 changes: 1 addition & 1 deletion event_loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func CreateEventLoop(setSize int, flag evFlag, parent *MultiEventLoop) (e *Event
parent: parent,
}
e.localTask.taskConfig = e.parent.globalTask.taskConfig
// e.localTask.initWithNoMutex()
e.localTask.taskMode = e.parent.globalTask.taskMode
e.localTask.init()
err = e.apiCreate(flag)
return e, err
Expand Down
14 changes: 10 additions & 4 deletions multi_event_loops.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,18 @@ type MultiEventLoop struct {
level slog.Level
stat // 统计信息
*slog.Logger
taskMode taskMode
}

var (
defMaxEventNum = 256
defTaskMin = 50
defTaskMax = 30000
defTaskInitCount = 1000
defNumLoops = runtime.NumCPU() / 4
defNumLoops = runtime.NumCPU()
)

// 这个函数会被调用两次
func (m *MultiEventLoop) initDefaultSetting() {
m.level = slog.LevelError // 默认打印error级别的日志
if m.numLoops == 0 {
Expand All @@ -51,19 +53,19 @@ func (m *MultiEventLoop) initDefaultSetting() {
}

if m.globalTask.min == 0 {
m.globalTask.min = max(defTaskMin/(m.numLoops+1), 1)
m.globalTask.min = defTaskMin
} else {
m.globalTask.min = max(m.globalTask.min/(m.numLoops+1), 1)
}

if m.globalTask.max == 0 {
m.globalTask.max = max(defTaskMax/(m.numLoops+1), 1)
m.globalTask.max = defTaskMax
} else {
m.globalTask.max = max(m.globalTask.max/(m.numLoops+1), 1)
}

if m.globalTask.initCount == 0 {
m.globalTask.initCount = max(defTaskInitCount/(m.numLoops+1), 1)
m.globalTask.initCount = defTaskInitCount
} else {
m.globalTask.initCount = max(m.globalTask.initCount/(m.numLoops+1), 1)
}
Expand Down Expand Up @@ -92,6 +94,10 @@ func NewMultiEventLoop(opts ...EvOption) (e *MultiEventLoop, err error) {
o(m)
}
m.initDefaultSetting()

// 设置任务池模式(tps, 或者流量模式)
m.globalTask.taskMode = m.taskMode

m.globalTask.init()

m.loops = make([]*EventLoop, m.numLoops)
Expand Down
18 changes: 18 additions & 0 deletions multi_event_loops_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,30 @@ func WithEventLoops(num int) EvOption {
// max: 最大协程数
func WithBusinessGoNum(initCount, min, max int) EvOption {
return func(e *MultiEventLoop) {
if initCount <= 0 {
initCount = defTaskInitCount
}

if min <= 0 {
min = defTaskMin
}

if max <= 0 {
max = defTaskMax
}
e.globalTask.initCount = initCount
e.globalTask.min = min
e.globalTask.max = max
}
}

// 设置business go程池 对流量压测友好的模式
func WithBusinessGoTrafficMode() EvOption {
return func(e *MultiEventLoop) {
e.taskMode = trafficMode
}
}

// 设置日志级别
func WithLogLevel(level slog.Level) EvOption {
return func(e *MultiEventLoop) {
Expand Down
15 changes: 14 additions & 1 deletion stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,23 +33,36 @@ type stat struct {

// 对外接口,查询当前业务协程池个数
func (m *MultiEventLoop) GetCurGoNum() (total int) {
// 全局go程数
total += int(m.globalTask.getCurGo())
for _, v := range m.loops {
// 本地任务数
total += int(v.localTask.getCurGo())
}
return
}

// 对外接口,查询业务协程池-全局池运行的当前业务数
func (m *MultiEventLoop) GetGlobalCurGoNum() int {
return int(m.globalTask.getCurGo())
}

// 对外接口,查询业务协程池运行的当前业务数
func (m *MultiEventLoop) GetCurTaskNum() (total int64) {

/// 全局任务数
total += m.globalTask.getCurTask()
for _, v := range m.loops {
// 本地任务数
total += v.localTask.getCurTask()
}
return
}

// 对外接口,查询业务协程池-全局池运行的当前业务数
func (m *MultiEventLoop) GetGlobalTaskNum() int {
return int(m.globalTask.getCurTask())
}

// 对外接口,查询移动字节数
func (m *MultiEventLoop) GetMoveBytesNum() uint64 {
return atomic.LoadUint64(&m.moveBytes)
Expand Down
Loading

0 comments on commit 78c087c

Please sign in to comment.