Skip to content

Commit

Permalink
Merge pull request #13 from antlabs/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
guonaihong authored Jan 20, 2024
2 parents 78c087c + 9a16121 commit 69d4d99
Show file tree
Hide file tree
Showing 9 changed files with 226 additions and 133 deletions.
6 changes: 3 additions & 3 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ type Config struct {
runInGoStrategy taskStrategy
}

func (c *Config) useIoUring() bool {
return c.multiEventLoop.flag == EVENT_IOURING
}
// func (c *Config) useIoUring() bool {
// return c.multiEventLoop.flag == EVENT_IOURING
// }

func (c *Config) initPayloadSize() int {
return int((1024.0 + float32(enum.MaxFrameHeaderSize)) * c.windowsMultipleTimesPayloadSize)
Expand Down
51 changes: 39 additions & 12 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"math/rand"
"sync/atomic"
"time"
"unsafe"

"github.com/antlabs/wsutil/bytespool"
"github.com/antlabs/wsutil/enum"
Expand Down Expand Up @@ -75,6 +76,14 @@ func (c *Conn) getLogger() *slog.Logger {
return c.multiEventLoop.Logger
}

// 获取当前绑定的go程
func (c *Conn) getCurrBindGo() *businessGo {
return (*businessGo)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&c.currBindGo))))
}

func (c *Conn) setCurrBindGo(b *businessGo) {
atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&c.currBindGo)), unsafe.Pointer(b))
}
func (c *Conn) addTask(ts taskStrategy, f func() bool) {
if c.isClosed() {
return
Expand All @@ -89,11 +98,33 @@ func (c *Conn) addTask(ts taskStrategy, f func() bool) {
return
}

if err := c.parent.localTask.addTask(c, ts, f); err == ErrTaskQueueFull {
if err = c.multiEventLoop.globalTask.addTask(c, ts, f); err == ErrTaskQueueFull {
// TODO
var err error
retry := 2
if c.parent.localTask.isFull() {
retry = 1
}

for i := 0; i < retry; i++ {
err = c.parent.localTask.addTask(c, ts, f)
if err == nil || retry == 1 {
break
}
if err == ErrTaskQueueFull {
if c.parent.localTask.addGoWithSteal(c.getCurrBindGo()) {
c.parent.localTask.rebindGo(c)
}
continue
}

}

if err == ErrTaskQueueFull {
// 负载高走自己专用chan
c.getCurrBindGo().taskChan <- f
// 负载低走公共chan, TODO, 需要找一个判断高/低雷负载的条件
// c.parent.localTask.public <- f
}

}

func (c *Conn) getFd() int {
Expand Down Expand Up @@ -572,15 +603,11 @@ func (c *Conn) WriteMessage(op Opcode, writeBuf []byte) (err error) {
}

var fw fixedwriter.FixedWriter
// 没有使用io_uring
if !c.useIoUring() {
c.mu.Lock()
err = frame.WriteFrame(&fw, c, writeBuf, true, rsv1, c.client, op, maskValue)
c.mu.Unlock()
} else {
// 使用io_uring
// TODO
}

c.mu.Lock()
err = frame.WriteFrame(&fw, c, writeBuf, true, rsv1, c.client, op, maskValue)
c.mu.Unlock()

return err
}

Expand Down
5 changes: 3 additions & 2 deletions conn_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (c *Conn) closeWithLock(err error) {

switch c.Config.runInGoStrategy {
case taskStrategyBind:
if c.currBindGo != nil {
if c.getCurrBindGo() != nil {
c.currBindGo.subBinConnCount()
}
case taskStrategyStream:
Expand Down Expand Up @@ -309,7 +309,8 @@ func (c *Conn) flushOrClose() (err error) {
// 2. 缓冲区数据不够,并且一次性读取了多个frame
func (c *Conn) processWebsocketFrame() (n int, err error) {
// 1. 处理frame header
if !c.useIoUring() {
// if !c.useIoUring() {
if true {
// 不使用io_uring的直接调用read获取buffer数据
for i := 0; ; i++ {
fd := atomic.LoadInt64(&c.fd)
Expand Down
4 changes: 2 additions & 2 deletions event_loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ func CreateEventLoop(setSize int, flag evFlag, parent *MultiEventLoop) (e *Event
maxFd: -1,
parent: parent,
}
e.localTask.taskConfig = e.parent.globalTask.taskConfig
e.localTask.taskMode = e.parent.globalTask.taskMode
e.localTask.taskConfig = e.parent.configTask.taskConfig
e.localTask.taskMode = e.parent.configTask.taskMode
e.localTask.init()
err = e.apiCreate(flag)
return e, err
Expand Down
35 changes: 19 additions & 16 deletions multi_event_loops.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,14 @@ type MultiEventLoop struct {
numLoops int // 事件循环数量
maxEventNum int
loops []*EventLoop
globalTask task
runInIo taskIo
flag evFlag // 是否使用io_uring
level slog.Level
stat // 统计信息
// 只是配置的作用,不是真正的任务池, fd是绑定到某个事件循环上的,
// 任务池是绑定到某个事件循环上的,所以这里的任务池也绑定到对应的localTask上
// 如果设计全局任务池,那么概念就会很乱,容易出错,也会临界区竞争
configTask task
runInIo taskIo
flag evFlag // 是否使用io_uring
level slog.Level
stat // 统计信息
*slog.Logger
taskMode taskMode
}
Expand All @@ -52,22 +55,22 @@ func (m *MultiEventLoop) initDefaultSetting() {
m.maxEventNum = defMaxEventNum
}

if m.globalTask.min == 0 {
m.globalTask.min = defTaskMin
if m.configTask.min == 0 {
m.configTask.min = defTaskMin
} else {
m.globalTask.min = max(m.globalTask.min/(m.numLoops+1), 1)
m.configTask.min = max(m.configTask.min/(m.numLoops), 1)
}

if m.globalTask.max == 0 {
m.globalTask.max = defTaskMax
if m.configTask.max == 0 {
m.configTask.max = defTaskMax
} else {
m.globalTask.max = max(m.globalTask.max/(m.numLoops+1), 1)
m.configTask.max = max(m.configTask.max/(m.numLoops), 1)
}

if m.globalTask.initCount == 0 {
m.globalTask.initCount = defTaskInitCount
if m.configTask.initCount == 0 {
m.configTask.initCount = defTaskInitCount
} else {
m.globalTask.initCount = max(m.globalTask.initCount/(m.numLoops+1), 1)
m.configTask.initCount = max(m.configTask.initCount/(m.numLoops), 1)
}

if m.flag == 0 {
Expand Down Expand Up @@ -96,9 +99,9 @@ func NewMultiEventLoop(opts ...EvOption) (e *MultiEventLoop, err error) {
m.initDefaultSetting()

// 设置任务池模式(tps, 或者流量模式)
m.globalTask.taskMode = m.taskMode
m.configTask.taskMode = m.taskMode

m.globalTask.init()
m.configTask.init()

m.loops = make([]*EventLoop, m.numLoops)

Expand Down
6 changes: 3 additions & 3 deletions multi_event_loops_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ func WithBusinessGoNum(initCount, min, max int) EvOption {
if max <= 0 {
max = defTaskMax
}
e.globalTask.initCount = initCount
e.globalTask.min = min
e.globalTask.max = max
e.configTask.initCount = initCount
e.configTask.min = min
e.configTask.max = max
}
}

Expand Down
18 changes: 0 additions & 18 deletions stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,42 +27,24 @@ type stat struct {
pollEv int64 // poll事件次数, 包含读,写, 错误事件
}

// func (m *MultiEventLoop) HighLoad() bool {
// return m.globalTask.highLoad()
// }

// 对外接口,查询当前业务协程池个数
func (m *MultiEventLoop) GetCurGoNum() (total int) {
// 全局go程数
total += int(m.globalTask.getCurGo())
for _, v := range m.loops {
// 本地任务数
total += int(v.localTask.getCurGo())
}
return
}

// 对外接口,查询业务协程池-全局池运行的当前业务数
func (m *MultiEventLoop) GetGlobalCurGoNum() int {
return int(m.globalTask.getCurGo())
}

// 对外接口,查询业务协程池运行的当前业务数
func (m *MultiEventLoop) GetCurTaskNum() (total int64) {
/// 全局任务数
total += m.globalTask.getCurTask()
for _, v := range m.loops {
// 本地任务数
total += v.localTask.getCurTask()
}
return
}

// 对外接口,查询业务协程池-全局池运行的当前业务数
func (m *MultiEventLoop) GetGlobalTaskNum() int {
return int(m.globalTask.getCurTask())
}

// 对外接口,查询移动字节数
func (m *MultiEventLoop) GetMoveBytesNum() uint64 {
return atomic.LoadUint64(&m.moveBytes)
Expand Down
Loading

0 comments on commit 69d4d99

Please sign in to comment.