From 20143a7a1d7004839d16dda799289e8a04244c20 Mon Sep 17 00:00:00 2001 From: guonaihong Date: Tue, 9 Jan 2024 00:51:17 +0800 Subject: [PATCH] Dev (#11) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * 优化 * 滑动窗口 * 更新 * 拆分task为全局和本地 * 调整下event loop结构 * 重构两个变量名 * 更新 * 更新 * OnOpen * 更新 * 更新 * 更新 --- api_epoll.go | 108 ++++++++++---- api_iouring.go | 2 +- api_iouring_process.go | 54 +++---- api_kqueue.go | 12 +- api_linux.go | 2 +- autobahn/autobahn-server.go | 51 +++++++ autobahn/config/fuzzingclient.json | 16 ++- callback.go | 2 +- client.go | 2 +- client_options.go | 2 +- common_options.go | 16 ++- config.go | 4 +- conn.go | 33 +++-- conn_unix.go | 73 ++++++---- err.go | 2 +- event_loop.go | 26 +++- multi_event_loops.go | 129 +++++++---------- multi_event_loops_option.go | 19 ++- opcode.go | 2 +- payload_big_pool.go | 13 ++ payload_pool.go | 13 ++ payload_pool_test.go | 13 ++ server_handshake.go | 2 +- server_options.go | 2 +- stat.go | 42 ++++-- status_codes.go | 2 +- task.go | 219 +++++++++++++++++++++-------- task_business_go.go | 47 +++++++ task_io.go | 4 +- task_stream.go | 58 ++++++++ task_window.go | 40 ++++++ task_window_test.go | 36 +++++ tasker.go | 4 +- upgrade.go | 3 +- utils.go | 2 +- 35 files changed, 777 insertions(+), 278 deletions(-) create mode 100644 task_business_go.go create mode 100644 task_stream.go create mode 100644 task_window.go create mode 100644 task_window_test.go diff --git a/api_epoll.go b/api_epoll.go index 1078913..b6c2d89 100644 --- a/api_epoll.go +++ b/api_epoll.go @@ -1,4 +1,4 @@ -// Copyright 2021-2023 antlabs. All rights reserved. +// Copyright 2023-2024 antlabs. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -31,12 +31,51 @@ const ( EPOLLET = 0x80000000 ) +const ( + // 垂直触发 + // 来自man 手册 + // When used as an edge-triggered interface, for performance reasons, + // it is possible to add the file descriptor inside the epoll interface (EPOLL_CTL_ADD) once by specifying (EPOLLIN|EPOLLOUT). + // This allows you to avoid con‐ + // tinuously switching between EPOLLIN and EPOLLOUT calling epoll_ctl(2) with EPOLL_CTL_MOD. + etRead = uint32(unix.EPOLLERR | unix.EPOLLHUP | unix.EPOLLRDHUP | unix.EPOLLPRI | unix.EPOLLIN | unix.EPOLLOUT | EPOLLET) + etWrite = uint32(0) + etDelWrite = uint32(0) + etResetRead = uint32(0) + + // 一次性触发 + etReadOneShot = uint32(unix.EPOLLERR | unix.EPOLLHUP | unix.EPOLLRDHUP | unix.EPOLLPRI | unix.EPOLLIN | unix.EPOLLOUT | EPOLLET | unix.EPOLLONESHOT) + etWriteOneShot = uint32(etReadOneShot) + etDelWriteOneShot = uint32(0) + etResetReadOneShot = uint32(etReadOneShot) +) + type epollState struct { epfd int events []unix.EpollEvent - et bool - parent *EventLoop + et bool + parent *EventLoop + rev uint32 + wev uint32 + dwEv uint32 // delete write event + resetEv uint32 +} + +func getReadWriteDeleteReset(oneShot bool, et bool) (uint32, uint32, uint32, uint32) { + if oneShot { + return etReadOneShot, etWriteOneShot, etDelWriteOneShot, etResetReadOneShot + } + + if et { + return etRead, etWrite, etDelWrite, etResetRead + } + + // if lt { + // return ltRead, ltWrite, ltDelWrite, ltResetRead + // } + + return 0, 0, 0, 0 } // 创建epoll handler @@ -49,7 +88,7 @@ func apiEpollCreate(parent *EventLoop) (la linuxApi, err error) { e.events = make([]unix.EpollEvent, 128) e.parent = parent - e.et = true + e.rev, e.wev, e.dwEv, e.resetEv = getReadWriteDeleteReset(false, true) return &e, nil } @@ -60,42 +99,53 @@ func (e *epollState) apiFree() { // 新加读事件 func (e *epollState) addRead(c *Conn) error { - fd := int(c.getFd()) - return unix.EpollCtl(e.epfd, unix.EPOLL_CTL_ADD, fd, &unix.EpollEvent{ - Fd: int32(fd), - // 来自man 手册 - // When used as an edge-triggered interface, for performance reasons, - // it is possible to add the file descriptor inside the epoll interface (EPOLL_CTL_ADD) once by specifying (EPOLLIN|EPOLLOUT). - // This allows you to avoid con‐ - // tinuously switching between EPOLLIN and EPOLLOUT calling epoll_ctl(2) with EPOLL_CTL_MOD. - Events: unix.EPOLLERR | unix.EPOLLHUP | unix.EPOLLRDHUP | unix.EPOLLPRI | unix.EPOLLIN | unix.EPOLLOUT | EPOLLET, - }) + if e.rev > 0 { + fd := int(c.getFd()) + return unix.EpollCtl(e.epfd, unix.EPOLL_CTL_ADD, fd, &unix.EpollEvent{ + Fd: int32(fd), + Events: e.rev, + }) + } + return nil } // 新加写事件 func (e *epollState) addWrite(c *Conn) error { - if e.et { - return nil + if e.wev > 0 { + + fd := int(c.getFd()) + return unix.EpollCtl(e.epfd, unix.EPOLL_CTL_MOD, fd, &unix.EpollEvent{ + Fd: int32(fd), + Events: e.wev, + }) } - fd := int(c.getFd()) - return unix.EpollCtl(e.epfd, unix.EPOLL_CTL_MOD, fd, &unix.EpollEvent{ - Fd: int32(fd), - Events: unix.EPOLLERR | unix.EPOLLHUP | unix.EPOLLRDHUP | unix.EPOLLPRI | unix.EPOLLIN | unix.EPOLLOUT, - }) + return nil } // 删除写事件 func (e *epollState) delWrite(c *Conn) error { - if e.et { - return nil + if e.dwEv > 0 { + + fd := int(c.getFd()) + return unix.EpollCtl(e.epfd, unix.EPOLL_CTL_MOD, fd, &unix.EpollEvent{ + Fd: int32(fd), + Events: e.dwEv, + }) } + return nil +} - fd := int(c.getFd()) - return unix.EpollCtl(e.epfd, unix.EPOLL_CTL_MOD, fd, &unix.EpollEvent{ - Fd: int32(fd), - Events: unix.EPOLLERR | unix.EPOLLHUP | unix.EPOLLRDHUP | unix.EPOLLPRI | unix.EPOLLIN | EPOLLET, - }) +// 重装添加读事件 +func (e *epollState) resetRead(c *Conn) error { + fd := c.getFd() + if e.resetEv > 0 { + return unix.EpollCtl(e.epfd, unix.EPOLL_CTL_MOD, int(fd), &unix.EpollEvent{ + Fd: int32(fd), + Events: e.resetEv, + }) + } + return nil } // 删除事件 @@ -123,7 +173,7 @@ func (e *epollState) apiPoll(tv time.Duration) (retVal int, err error) { numEvents = retVal for i := 0; i < numEvents; i++ { ev := &e.events[i] - conn := e.parent.parent.getConn(int(ev.Fd)) + conn := e.parent.getConn(int(ev.Fd)) if conn == nil { unix.Close(int(ev.Fd)) continue diff --git a/api_iouring.go b/api_iouring.go index 5f202af..2a55299 100644 --- a/api_iouring.go +++ b/api_iouring.go @@ -104,7 +104,7 @@ func (e *iouringState) getLogger() *slog.Logger { } func (e *iouringState) getConn(fd uint32) *Conn { - return e.parent.parent.getConn(int(fd)) + return e.parent.getConn(int(fd)) } // io-uring 处理事件的入口函数 diff --git a/api_iouring_process.go b/api_iouring_process.go index 40e30f2..56b42e7 100644 --- a/api_iouring_process.go +++ b/api_iouring_process.go @@ -10,33 +10,33 @@ import ( ) func (c *Conn) processRead(cqe *giouring.CompletionQueueEvent) error { - // 返回值小于等于0,表示读取完毕,关闭连接 - if cqe.Res <= 0 { - c.closeWithLock(io.EOF) - c.getLogger().Debug("read res <= 0", "res", cqe.Res, "fd", c.fd) - return nil - } - - c.getLogger().Debug("read res", "res", cqe.Res, "fd", c.fd) - - // 处理websocket数据 - c.rw += int(cqe.Res) - _, err := c.processWebsocketFrameOnlyIoUring() - if err != nil { - c.getLogger().Error("processWebsocketFrameOnlyIoUring", "err", err) - return err - } - - parent := c.getParent() - if parent == nil { - c.processClose(cqe) - c.getLogger().Info("parent is nil", "close", c.closed) - return nil - } - - if err := parent.addRead(c); err != nil { - return err - } + // // 返回值小于等于0,表示读取完毕,关闭连接 + // if cqe.Res <= 0 { + // c.closeWithLock(io.EOF) + // c.getLogger().Debug("read res <= 0", "res", cqe.Res, "fd", c.fd) + // return nil + // } + + // c.getLogger().Debug("read res", "res", cqe.Res, "fd", c.fd) + + // // 处理websocket数据 + // c.rw += int(cqe.Res) + // _, err := c.processWebsocketFrameOnlyIoUring() + // if err != nil { + // c.getLogger().Error("processWebsocketFrameOnlyIoUring", "err", err) + // return err + // } + + // parent := c.getParent() + // if parent == nil { + // c.processClose(cqe) + // c.getLogger().Info("parent is nil", "close", c.closed) + // return nil + // } + + // if err := parent.addRead(c); err != nil { + // return err + // } return nil } diff --git a/api_kqueue.go b/api_kqueue.go index ff50d65..1ac082c 100644 --- a/api_kqueue.go +++ b/api_kqueue.go @@ -1,4 +1,4 @@ -// Copyright 2021-2023 antlabs. All rights reserved. +// Copyright 2023-2024 antlabs. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -20,7 +20,6 @@ package greatws import ( "errors" "io" - "syscall" "time" "golang.org/x/sys/unix" @@ -87,13 +86,6 @@ func (e *EventLoop) addWrite(c *Conn) error { return e.trigger() } -func (e *EventLoop) del(fd int) error { - e.mu.Lock() - e.apiState.changes = append(e.apiState.changes, unix.Kevent_t{Ident: uint64(fd), Flags: syscall.EV_DELETE, Filter: syscall.EVFILT_READ}) - e.mu.Unlock() - return e.trigger() -} - func (e *EventLoop) apiPoll(tv time.Duration) (retVal int, err error) { state := e.apiState @@ -124,7 +116,7 @@ func (e *EventLoop) apiPoll(tv time.Duration) (retVal int, err error) { ev := &state.events[j] fd := int(ev.Ident) // fmt.Printf("fd :%d, filter :%x, flags :%x\n", fd, ev.Filter, ev.Flags) - conn := e.parent.getConn(fd) + conn := e.getConn(fd) if conn == nil { unix.Close(fd) continue diff --git a/api_linux.go b/api_linux.go index 2be3f6e..f23e125 100644 --- a/api_linux.go +++ b/api_linux.go @@ -1,4 +1,4 @@ -// Copyright 2023-2023 antlabs. All rights reserved. +// Copyright 2023-2024 antlabs. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/autobahn/autobahn-server.go b/autobahn/autobahn-server.go index 74a6a79..f2e1c10 100644 --- a/autobahn/autobahn-server.go +++ b/autobahn/autobahn-server.go @@ -52,6 +52,7 @@ type handler struct { m *greatws.MultiEventLoop } +// 运行在业务线程 func (h *handler) echo(w http.ResponseWriter, r *http.Request) { opts := []greatws.ServerOption{ greatws.WithServerReplyPing(), @@ -74,6 +75,54 @@ func (h *handler) echo(w http.ResponseWriter, r *http.Request) { _ = c } +// 运行在io线程 +func (h *handler) echoRunInIo(w http.ResponseWriter, r *http.Request) { + opts := []greatws.ServerOption{ + greatws.WithServerReplyPing(), + greatws.WithServerDecompression(), + greatws.WithServerIgnorePong(), + greatws.WithServerCallback(&echoHandler{}), + greatws.WithServerEnableUTF8Check(), + greatws.WithServerReadTimeout(5 * time.Second), + greatws.WithServerMultiEventLoop(h.m), + greatws.WithServerCallbackInEventLoop(), + } + + if *runInEventLoop { + opts = append(opts, greatws.WithServerCallbackInEventLoop()) + } + + c, err := greatws.Upgrade(w, r, opts...) + if err != nil { + slog.Error("Upgrade fail:", "err", err.Error()) + } + _ = c +} + +// 使用stream模式运行, 一个websocket一个go程 +func (h *handler) echoRunStream(w http.ResponseWriter, r *http.Request) { + opts := []greatws.ServerOption{ + greatws.WithServerReplyPing(), + greatws.WithServerDecompression(), + greatws.WithServerIgnorePong(), + greatws.WithServerCallback(&echoHandler{}), + greatws.WithServerEnableUTF8Check(), + greatws.WithServerReadTimeout(5 * time.Second), + greatws.WithServerMultiEventLoop(h.m), + greatws.WithServerStreamMode(), + greatws.WithServerCallbackInEventLoop(), + } + + if *runInEventLoop { + opts = append(opts, greatws.WithServerCallbackInEventLoop()) + } + + c, err := greatws.Upgrade(w, r, opts...) + if err != nil { + slog.Error("Upgrade fail:", "err", err.Error()) + } + _ = c +} func main() { flag.Parse() @@ -97,6 +146,8 @@ func main() { }() mux := &http.ServeMux{} mux.HandleFunc("/autobahn", h.echo) + mux.HandleFunc("/autobahn-io", h.echoRunInIo) + mux.HandleFunc("/autobahn-stream", h.echoRunStream) rawTCP, err := net.Listen("tcp", ":9001") if err != nil { diff --git a/autobahn/config/fuzzingclient.json b/autobahn/config/fuzzingclient.json index 90388bd..ec5592f 100644 --- a/autobahn/config/fuzzingclient.json +++ b/autobahn/config/fuzzingclient.json @@ -2,11 +2,25 @@ "outdir": "./report/", "servers": [ { - "agent": "non-tls", + "agent": "non-tls-taskbind", "url": "ws://127.0.0.1:9001/autobahn", "options": { "version": 18 } + }, + { + "agent": "non-tls-io", + "url": "ws://127.0.0.1:9001/autobahn-io", + "options": { + "version": 18 + } + }, + { + "agent": "non-tls-stream", + "url": "ws://127.0.0.1:9001/autobahn-stream", + "options": { + "version": 18 + } } ], "cases": [ diff --git a/callback.go b/callback.go index cb45186..0785083 100644 --- a/callback.go +++ b/callback.go @@ -1,4 +1,4 @@ -// Copyright 2021-2023 antlabs. All rights reserved. +// Copyright 2023-2024 antlabs. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/client.go b/client.go index 361c730..7b04e5e 100644 --- a/client.go +++ b/client.go @@ -1,4 +1,4 @@ -// Copyright 2021-2023 antlabs. All rights reserved. +// Copyright 2023-2024 antlabs. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/client_options.go b/client_options.go index 0c67660..85952df 100644 --- a/client_options.go +++ b/client_options.go @@ -1,4 +1,4 @@ -// Copyright 2021-2023 antlabs. All rights reserved. +// Copyright 2023-2024 antlabs. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/common_options.go b/common_options.go index b97e0d7..569a90c 100644 --- a/common_options.go +++ b/common_options.go @@ -1,4 +1,4 @@ -// Copyright 2021-2023 antlabs. All rights reserved. +// Copyright 2023-2024 antlabs. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -294,6 +294,20 @@ func WithClientCallbackInEventLoop() ClientOption { } } +// 19.1 配置服务端使用stream模式处理请求,该模式一个连接会独占一个go程,如果你的请求对时序有要求,可以使用这个模式 +func WithServerStreamMode() ServerOption { + return func(o *ConnOption) { + o.runInGoStrategy = taskStrategyStream + } +} + +// 19.2 配置客户端使用stream模式处理请求,该模式一个连接会独占一个go程,如果你的请求对时序有要求,可以使用这个模式 +func WithClientStreamMode() ClientOption { + return func(o *DialOption) { + o.runInGoStrategy = taskStrategyStream + } +} + // last 配置event func WithServerMultiEventLoop(m *MultiEventLoop) ServerOption { return func(o *ConnOption) { diff --git a/config.go b/config.go index 1697a16..39e5b85 100644 --- a/config.go +++ b/config.go @@ -1,4 +1,4 @@ -// Copyright 2021-2023 antlabs. All rights reserved. +// Copyright 2023-2024 antlabs. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -38,6 +38,7 @@ type Config struct { subProtocols []string // 设置支持的子协议 multiEventLoop *MultiEventLoop callbackInEventLoop bool // 在event loop中运行websocket OnOpen, OnMessage, OnClose 回调函数 + runInGoStrategy taskStrategy } func (c *Config) useIoUring() bool { @@ -55,6 +56,7 @@ func (c *Config) defaultSetting() { c.windowsMultipleTimesPayloadSize = 1.0 c.delayWriteInitBufferSize = 8 * 1024 c.maxDelayWriteDuration = 10 * time.Millisecond + c.runInGoStrategy = taskStrategyBind c.tcpNoDelay = true // c.parseMode = ParseModeWindows // 对于text消息,默认不检查text是utf8字符 diff --git a/conn.go b/conn.go index aa8bb00..e97cd66 100644 --- a/conn.go +++ b/conn.go @@ -1,4 +1,4 @@ -// Copyright 2021-2023 antlabs. All rights reserved. +// Copyright 2023-2024 antlabs. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -75,12 +75,25 @@ func (c *Conn) getLogger() *slog.Logger { return c.multiEventLoop.Logger } -func (c *Conn) getTask() Tasker { +func (c *Conn) addTask(ts taskStrategy, f func() bool) { + if c.isClosed() { + return + } + if c.callbackInEventLoop { - return &c.multiEventLoop.t2 + c.multiEventLoop.runInIo.addTask(ts, f) + return + } + if ts == taskStrategyStream { + c.taskStream.addTask(ts, f) + return } - return &c.multiEventLoop.t + if err := c.parent.localTask.addTask(c, ts, f); err == ErrTaskQueueFull { + if err = c.multiEventLoop.globalTask.addTask(c, ts, f); err == ErrTaskQueueFull { + // TODO + } + } } func (c *Conn) getFd() int { @@ -308,7 +321,7 @@ func (c *Conn) processCallback(f frame.Frame2) (err error) { c.fragmentFramePayload = nil // 进入业务协程执行 - c.getTask().addTask(func() (exit bool) { + c.addTask(c.runInGoStrategy, func() (exit bool) { if fragmentFrameHeader.GetRsv1() && decompression { tempBuf, err := decode(*fragmentFramePayload) if err != nil { @@ -368,7 +381,7 @@ func (c *Conn) processCallback(f frame.Frame2) (err error) { // payloadPtr.Store(f.Payload) // 进入业务协程执行 - c.getTask().addTask(func() bool { + c.addTask(c.runInGoStrategy, func() bool { // payload := payloadPtr.Load() if needMask { @@ -456,7 +469,7 @@ func (c *Conn) processCallback(f frame.Frame2) (err error) { } // 进入业务协程执行 payload := f.Payload - c.getTask().addTask(func() bool { + c.addTask(c.runInGoStrategy, func() bool { c.Callback.OnMessage(c, f.Opcode, *payload) PutPayloadBytes(payload) return false @@ -470,7 +483,7 @@ func (c *Conn) processCallback(f frame.Frame2) (err error) { } // 进入业务协程执行 - c.getTask().addTask(func() bool { + c.addTask(c.runInGoStrategy, func() bool { c.Callback.OnMessage(c, f.Opcode, nil) return false }) @@ -570,3 +583,7 @@ func (c *Conn) WriteMessage(op Opcode, writeBuf []byte) (err error) { } return err } + +func (c *Conn) Close() { + c.closeWithLock(nil) +} diff --git a/conn_unix.go b/conn_unix.go index 61f88c3..cba7c2e 100644 --- a/conn_unix.go +++ b/conn_unix.go @@ -1,3 +1,17 @@ +// Copyright 2023-2024 antlabs. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + //go:build linux || darwin || netbsd || freebsd || openbsd || dragonfly // +build linux darwin netbsd freebsd openbsd dragonfly @@ -63,21 +77,15 @@ type Conn struct { // 存在io-uring相关的控制信息 // onlyIoUringState - wbuf *[]byte // 写缓冲区, 当直接Write失败时,会将数据写入缓冲区 - mu sync.Mutex - client bool // 客户端为true,服务端为false - *Config // 配置 - closed int32 // 是否关闭 - closeOnce sync.Once - parent *EventLoop -} - -func (c *Conn) setParent(el *EventLoop) { - atomic.StorePointer((*unsafe.Pointer)((unsafe.Pointer)(&c.parent)), unsafe.Pointer(el)) -} - -func (c *Conn) getParent() *EventLoop { - return (*EventLoop)(atomic.LoadPointer((*unsafe.Pointer)((unsafe.Pointer)(&c.parent)))) + wbuf *[]byte // 写缓冲区, 当直接Write失败时,会将数据写入缓冲区 + mu sync.Mutex + client bool // 客户端为true,服务端为false + *Config // 配置 + closed int32 // 是否关闭 + closeOnce sync.Once + parent *EventLoop + currBindGo *businessGo + taskStream taskStream } func newConn(fd int64, client bool, conf *Config) *Conn { @@ -90,8 +98,12 @@ func newConn(fd int64, client bool, conf *Config) *Conn { // 初始化不分配内存,只有在需要的时候才分配 Config: conf, client: client, + parent: conf.multiEventLoop.getEventLoop(int(fd)), } + if conf.runInGoStrategy == taskStrategyStream { + c.taskStream.init() + } return c } @@ -102,11 +114,8 @@ func duplicateSocket(socketFD int) (int, error) { func (c *Conn) closeInner(err error) { fd := c.getFd() c.getLogger().Debug("close conn", slog.Int64("fd", int64(fd))) - c.multiEventLoop.del(c) + c.parent.del(c) atomic.StoreInt64(&c.fd, -1) - c.closeOnce.Do(func() { - atomic.StorePointer((*unsafe.Pointer)((unsafe.Pointer)(&c.parent)), nil) - }) atomic.StoreInt32(&c.closed, 1) } @@ -116,12 +125,17 @@ func (c *Conn) closeWithLock(err error) { } c.mu.Lock() + if c.isClosed() { + c.mu.Unlock() + return + } + + if c.Config.runInGoStrategy == taskStrategyStream { + c.taskStream.close() + } c.closeInner(err) - c.mu.Unlock() -} -func (c *Conn) Close() { - c.closeWithLock(nil) + c.mu.Unlock() } func (c *Conn) getPtr() int { @@ -147,7 +161,7 @@ func (c *Conn) Write(b []byte) (n int, err error) { if old != c.wbuf { PutPayloadBytes(old) } - b = *c.wbuf + return curN, nil } } @@ -168,7 +182,7 @@ func (c *Conn) Write(b []byte) (n int, err error) { c.wbuf = newBuf } - if err = c.multiEventLoop.addWrite(c); err != nil { + if err = c.parent.addWrite(c); err != nil { return total, err } } @@ -238,7 +252,7 @@ func (c *Conn) flushOrCloseInner(needLock bool) (err error) { PutPayloadBytes(old) c.wbuf = nil - if err := c.multiEventLoop.delWrite(c); err != nil { + if err := c.parent.delWrite(c); err != nil { return err } } else { @@ -246,6 +260,9 @@ func (c *Conn) flushOrCloseInner(needLock bool) (err error) { // 记录写入的数据,如果有写入,分配一个新的缓冲区 if total > 0 && ws == writeEagain { newBuf := GetPayloadBytes(len(*old) - total) + if len(*newBuf) < len(*old)-total { + panic("newBuf is too small") + } copy(*newBuf, (*old)[total:]) if c.wbuf != nil { PutPayloadBytes(c.wbuf) @@ -253,7 +270,7 @@ func (c *Conn) flushOrCloseInner(needLock bool) (err error) { c.wbuf = newBuf } - if err = c.multiEventLoop.addWrite(c); err != nil { + if err = c.parent.addWrite(c); err != nil { return err } } @@ -267,7 +284,7 @@ func (c *Conn) flushOrCloseInner(needLock bool) (err error) { "write_state", ws.String()) } else { c.getLogger().Debug("wbuf is nil", "fd", c.getFd()) - if err := c.multiEventLoop.delWrite(c); err != nil { + if err := c.parent.delWrite(c); err != nil { return err } } diff --git a/err.go b/err.go index bd9e7ee..9913a50 100644 --- a/err.go +++ b/err.go @@ -1,4 +1,4 @@ -// Copyright 2021-2023 antlabs. All rights reserved. +// Copyright 2023-2024 antlabs. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/event_loop.go b/event_loop.go index 4298b75..354615a 100644 --- a/event_loop.go +++ b/event_loop.go @@ -1,4 +1,4 @@ -// Copyright 2021-2023 antlabs. All rights reserved. +// Copyright 2023-2024 antlabs. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -16,6 +16,7 @@ package greatws import ( "context" "sync" + "sync/atomic" "time" ) @@ -34,14 +35,19 @@ type EventLoop struct { *apiState // 每个平台对应的异步io接口/epoll/kqueue/iouring shutdown bool parent *MultiEventLoop + localTask task } // 初始化函数 -func CreateEventLoop(setSize int, flag evFlag) (e *EventLoop, err error) { +func CreateEventLoop(setSize int, flag evFlag, parent *MultiEventLoop) (e *EventLoop, err error) { e = &EventLoop{ setSize: setSize, maxFd: -1, + parent: parent, } + e.localTask.taskConfig = e.parent.globalTask.taskConfig + // e.localTask.initWithNoMutex() + e.localTask.init() err = e.apiCreate(flag) return e, err } @@ -65,6 +71,22 @@ func (el *EventLoop) Loop() { } } +// 获取一个连接 +func (m *EventLoop) getConn(fd int) *Conn { + + v, ok := m.conns.Load(fd) + if !ok { + return nil + } + return v.(*Conn) +} + +func (el *EventLoop) del(c *Conn) { + fd := c.getFd() + atomic.AddInt64(&el.parent.curConn, -1) + el.conns.Delete(fd) + closeFd(fd) +} func (el *EventLoop) GetApiName() string { return el.apiName() } diff --git a/multi_event_loops.go b/multi_event_loops.go index fa7c188..9d107e7 100644 --- a/multi_event_loops.go +++ b/multi_event_loops.go @@ -1,3 +1,16 @@ +// Copyright 2023-2024 antlabs. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. package greatws import ( @@ -11,45 +24,48 @@ type MultiEventLoop struct { numLoops int // 事件循环数量 maxEventNum int loops []*EventLoop - t task - t2 taskIo + globalTask task + runInIo taskIo flag evFlag // 是否使用io_uring level slog.Level stat // 统计信息 *slog.Logger } -func (m *MultiEventLoop) initDefaultSettingBefore() { - m.level = slog.LevelError // 默认打印error级别的日志 - m.numLoops = 0 - m.maxEventNum = 10000 - m.t.min = 50 - m.t.initCount = 1000 - m.t.max = 30000 -} +var ( + defMaxEventNum = 256 + defTaskMin = 50 + defTaskMax = 30000 + defTaskInitCount = 1000 + defNumLoops = runtime.NumCPU() / 4 +) -func (m *MultiEventLoop) initDefaultSettingAfter() { +func (m *MultiEventLoop) initDefaultSetting() { + m.level = slog.LevelError // 默认打印error级别的日志 if m.numLoops == 0 { - m.numLoops = runtime.NumCPU() / 4 - if m.numLoops == 0 { - m.numLoops = 1 - } + m.numLoops = max(defNumLoops, 1) } if m.maxEventNum == 0 { - m.maxEventNum = 256 + m.maxEventNum = defMaxEventNum } - if m.t.min == 0 { - m.t.min = 50 + if m.globalTask.min == 0 { + m.globalTask.min = max(defTaskMin/(m.numLoops+1), 1) + } else { + m.globalTask.min = max(m.globalTask.min/(m.numLoops+1), 1) } - if m.t.initCount == 0 { - m.t.initCount = 1000 + if m.globalTask.max == 0 { + m.globalTask.max = max(defTaskMax/(m.numLoops+1), 1) + } else { + m.globalTask.max = max(m.globalTask.max/(m.numLoops+1), 1) } - if m.t.max == 0 { - m.t.max = 30000 + if m.globalTask.initCount == 0 { + m.globalTask.initCount = max(defTaskInitCount/(m.numLoops+1), 1) + } else { + m.globalTask.initCount = max(m.globalTask.initCount/(m.numLoops+1), 1) } if m.flag == 0 { @@ -71,22 +87,20 @@ func NewMultiEventLoopMust(opts ...EvOption) *MultiEventLoop { func NewMultiEventLoop(opts ...EvOption) (e *MultiEventLoop, err error) { m := &MultiEventLoop{} - m.initDefaultSettingBefore() + m.initDefaultSetting() for _, o := range opts { o(m) } - m.initDefaultSettingAfter() - - m.t.init() + m.initDefaultSetting() + m.globalTask.init() m.loops = make([]*EventLoop, m.numLoops) for i := 0; i < m.numLoops; i++ { - m.loops[i], err = CreateEventLoop(m.maxEventNum, m.flag) + m.loops[i], err = CreateEventLoop(m.maxEventNum, m.flag, m) if err != nil { return nil, err } - m.loops[i].parent = m } return m, nil } @@ -98,68 +112,19 @@ func (m *MultiEventLoop) Start() { } } +func (m *MultiEventLoop) getEventLoop(fd int) *EventLoop { + return m.loops[fd%len(m.loops)] +} + // 添加一个连接到多路事件循环 func (m *MultiEventLoop) add(c *Conn) error { fd := c.getFd() index := fd % len(m.loops) m.loops[index].conns.Store(fd, c) if err := m.loops[index].addRead(c); err != nil { - m.del(c) + m.loops[index].del(c) return err } - c.setParent(m.loops[index]) atomic.AddInt64(&m.curConn, 1) return nil } - -// 添加一个可写事件到多路事件循环 -func (m *MultiEventLoop) addWrite(c *Conn) error { - fd := c.getFd() - if fd == -1 { - return nil - } - index := fd % len(m.loops) - if err := m.loops[index].addWrite(c); err != nil { - return err - } - m.loops[index].conns.LoadOrStore(fd, c) - return nil -} - -// 添加一个可写事件到多路事件循环 -func (m *MultiEventLoop) delWrite(c *Conn) error { - fd := c.getFd() - if fd == -1 { - return nil - } - - index := fd % len(m.loops) - if err := m.loops[index].delWrite(c); err != nil { - return err - } - m.loops[index].conns.LoadOrStore(fd, c) - return nil -} - -// 从多路事件循环中删除一个连接 -func (m *MultiEventLoop) del(c *Conn) { - fd := c.getFd() - - if fd == -1 { - return - } - atomic.AddInt64(&m.curConn, -1) - index := fd % len(m.loops) - m.loops[index].conns.Delete(fd) - closeFd(fd) -} - -// 获取一个连接 -func (m *MultiEventLoop) getConn(fd int) *Conn { - index := fd % len(m.loops) - v, ok := m.loops[index].conns.Load(fd) - if !ok { - return nil - } - return v.(*Conn) -} diff --git a/multi_event_loops_option.go b/multi_event_loops_option.go index 0c13d85..3edb30c 100644 --- a/multi_event_loops_option.go +++ b/multi_event_loops_option.go @@ -1,3 +1,16 @@ +// Copyright 2021-2024 antlabs. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. package greatws import "log/slog" @@ -17,9 +30,9 @@ func WithEventLoops(num int) EvOption { // max: 最大协程数 func WithBusinessGoNum(initCount, min, max int) EvOption { return func(e *MultiEventLoop) { - e.t.initCount = initCount - e.t.min = min - e.t.max = max + e.globalTask.initCount = initCount + e.globalTask.min = min + e.globalTask.max = max } } diff --git a/opcode.go b/opcode.go index a2f52db..d714e32 100644 --- a/opcode.go +++ b/opcode.go @@ -1,4 +1,4 @@ -// Copyright 2021-2023 antlabs. All rights reserved. +// Copyright 2021-2024 antlabs. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/payload_big_pool.go b/payload_big_pool.go index 4fc8f28..2bec973 100644 --- a/payload_big_pool.go +++ b/payload_big_pool.go @@ -1,3 +1,16 @@ +// Copyright 2023-2024 antlabs. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. package greatws import "sync" diff --git a/payload_pool.go b/payload_pool.go index 0c6ff79..f228165 100644 --- a/payload_pool.go +++ b/payload_pool.go @@ -1,3 +1,16 @@ +// Copyright 2023-2024 antlabs. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. package greatws import ( diff --git a/payload_pool_test.go b/payload_pool_test.go index 1ee17be..0cea1a2 100644 --- a/payload_pool_test.go +++ b/payload_pool_test.go @@ -1,3 +1,16 @@ +// Copyright 2023-2024 antlabs. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. package greatws import ( diff --git a/server_handshake.go b/server_handshake.go index 1407b9d..0568dd5 100644 --- a/server_handshake.go +++ b/server_handshake.go @@ -1,4 +1,4 @@ -// Copyright 2021-2023 antlabs. All rights reserved. +// Copyright 2023-2024 antlabs. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/server_options.go b/server_options.go index a90d51d..1bddb93 100644 --- a/server_options.go +++ b/server_options.go @@ -1,4 +1,4 @@ -// Copyright 2021-2023 antlabs. All rights reserved. +// Copyright 2023-2024 antlabs. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/stat.go b/stat.go index 60220aa..18a7393 100644 --- a/stat.go +++ b/stat.go @@ -1,3 +1,16 @@ +// Copyright 2023-2024 antlabs. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. package greatws import "sync/atomic" @@ -14,13 +27,27 @@ type stat struct { pollEv int64 // poll事件次数, 包含读,写, 错误事件 } -func (m *MultiEventLoop) HighLoad() bool { - return m.t.highLoad() -} +// func (m *MultiEventLoop) HighLoad() bool { +// return m.globalTask.highLoad() +// } // 对外接口,查询当前业务协程池个数 -func (m *MultiEventLoop) GetCurGoNum() int { - return int(m.t.getCurGo()) +func (m *MultiEventLoop) GetCurGoNum() (total int) { + total += int(m.globalTask.getCurGo()) + for _, v := range m.loops { + total += int(v.localTask.getCurGo()) + } + return +} + +// 对外接口,查询业务协程池运行的当前业务数 +func (m *MultiEventLoop) GetCurTaskNum() (total int64) { + + total += m.globalTask.getCurTask() + for _, v := range m.loops { + total += v.localTask.getCurTask() + } + return } // 对外接口,查询移动字节数 @@ -48,11 +75,6 @@ func (m *MultiEventLoop) GetCurConnNum() int64 { return atomic.LoadInt64(&m.curConn) } -// 对外接口,查询业务协程池运行的当前业务数 -func (m *MultiEventLoop) GetCurTaskNum() int64 { - return m.t.getCurTask() -} - // 对外接口,查询poll read事件次数 func (m *MultiEventLoop) GetReadEvNum() int64 { return atomic.LoadInt64(&m.readEv) diff --git a/status_codes.go b/status_codes.go index 6340c55..c64577f 100644 --- a/status_codes.go +++ b/status_codes.go @@ -1,4 +1,4 @@ -// Copyright 2021-2023 antlabs. All rights reserved. +// Copyright 2023-2024 antlabs. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/task.go b/task.go index 1c0dc93..bc07037 100644 --- a/task.go +++ b/task.go @@ -1,4 +1,4 @@ -// Copyright 2021-2023 antlabs. All rights reserved. +// Copyright 2021-2024 antlabs. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -14,28 +14,58 @@ package greatws import ( + "errors" + "sync" "sync/atomic" "time" ) +// 运行task的策略 +// 1. 随机 +// 2. 取余映射 +type taskStrategy int + +const ( + // 随机。默认 + taskStrategyRandom taskStrategy = iota + // 绑定映射 + taskStrategyBind + // 流式映射 + taskStrategyStream +) + +var ErrTaskQueueFull = errors.New("task queue full") + var exitFunc = func() bool { return true } +type taskConfig struct { + initCount int // 初始化的协程数 + min int // 最小协程数 + max int // 最大协程数 +} + type task struct { - c chan func() bool + c chan func() bool + windows windows // 滑动窗口计数,用于判断是否需要新增go程 + taskConfig + curGo int64 // 当前运行协程数 + curTask int64 // 当前运行任务数 - initCount int // 初始化的协程数 - min int // 最小协程数 - max int // 最大协程数 - curGo int64 // 当前运行协程数 - curTask int64 // 当前运行任务数 + allBusinessGo sync.Map // key是[*businessGo, struct{}], 目前先这样。后面再优化下 } -func (t *task) init() { +// 初始化 +func (t *task) initInner() { t.c = make(chan func() bool) + t.windows.init() go t.manageGo() go t.runConsumerLoop() } +func (t *task) init() { + t.initInner() +} + func (t *task) getCurGo() int64 { return atomic.LoadInt64(&t.curGo) } @@ -47,107 +77,176 @@ func (t *task) getCurTask() int64 { // 消费者循环 func (t *task) consumer() { defer atomic.AddInt64(&t.curGo, -1) - for f := range t.c { + currBusinessGo := newBusinessGo() + t.allBusinessGo.Store(currBusinessGo, struct{}{}) + + var f func() bool + for { + select { + case f = <-t.c: + case f = <-currBusinessGo.taskChan: + } atomic.AddInt64(&t.curTask, 1) if b := f(); b { atomic.AddInt64(&t.curTask, -1) + if !currBusinessGo.canKill() { + continue + } + break } atomic.AddInt64(&t.curTask, -1) } } -func (t *task) highLoad() bool { - // curGo := atomic.LoadInt64(&t.curGo) - for i := 1; ; i++ { +// func (t *task) highLoad() bool { +// // curGo := atomic.LoadInt64(&t.curGo) +// for i := 1; ; i++ { - curTask := atomic.LoadInt64(&t.curTask) +// curTask := atomic.LoadInt64(&t.curTask) - if curTask >= int64(t.curGo) { - return true - } +// if curTask >= int64(t.curGo) { +// return true +// } - // 这里的判断条件不准确,因为curGO是表示go程多少,不能表示任务多少, 比如1w上go程,一个任务也不跑 - // if curGo := atomic.LoadInt64(&t.curGo); curGo > int64(t.max) { - // return true - // } +// // 这里的判断条件不准确,因为curGo是表示go程多少,不能表示任务多少, 比如1w上go程,一个任务也不跑 +// // if curGo := atomic.LoadInt64(&t.curGo); curGo > int64(t.max) { +// // return true +// // } - if !t.needResize() { - return false - } +// if need, _ := t.needGrow(); !need { +// return false +// } - curGo := atomic.LoadInt64(&t.curGo) - maxGo := int64(t.max) - need := min(2*i, max(0, int(maxGo-curGo))) - if need > 0 { - t.addGoNum(need) - } - } +// curGo := atomic.LoadInt64(&t.curGo) +// maxGo := int64(t.max) +// need := min(2*i, max(0, int(maxGo-curGo))) +// if need > 0 { +// t.addGoNum(need) +// } +// } +// } + +// 随机获取一个go程 +func (t *task) randomGo() *businessGo { + var currBusinessGo *businessGo + t.allBusinessGo.Range(func(key, value interface{}) bool { + currBusinessGo = key.(*businessGo) + return false + }) + return currBusinessGo } // 新增任务, 如果任务队列满了, 新增go程, 这可能会导致协程数超过最大值, 为了防止死锁,还是需要新增业务go程 // 在io线程里面会判断go程池是否高负载,如果是高负载,会取消read的任务, 放到wbuf里面, 延后再处理 -func (t *task) addTask(f func() bool) { +func (t *task) addTask(c *Conn, ts taskStrategy, f func() bool) error { + + if ts == taskStrategyBind { + if c.currBindGo == nil { + c.currBindGo = t.randomGo() + } + currChan := c.currBindGo.taskChan + // 如果任务未满,直接放入任务队列 + if len(currChan) < cap(currChan) { + currChan <- f + return nil + } + + } + + if len(t.c) >= cap(t.c) { + return ErrTaskQueueFull + } t.c <- f + return nil } -// func (t *task) addTask(f func() bool) { -// for { -// select { -// case t.c <- f: -// return -// case <-time.After(time.Millisecond * 250): -// t.addGo() -// } -// } -// } - // 新增go程 func (t *task) addGo() { - go func() { - atomic.AddInt64(&t.curGo, 1) - defer atomic.AddInt64(&t.curGo, -1) - t.consumer() - }() + atomic.AddInt64(&t.curGo, 1) + defer atomic.AddInt64(&t.curGo, -1) + t.consumer() } func (t *task) addGoNum(n int) { for i := 0; i < n; i++ { - t.addGo() + go t.addGo() } } // 取消go程 -func (t *task) cancelGo() { - if atomic.LoadInt64(&t.curGo) > int64(t.min) { +func (t *task) cancelGoNum(sharkSize int) { + + if atomic.LoadInt64(&t.curGo) < int64(t.min) { + return + } + for i := 0; i < sharkSize; i++ { + if atomic.LoadInt64(&t.curGo) < int64(t.min) { + return + } t.c <- exitFunc } + } -func (t *task) needResize() bool { +// 需要扩容 +func (t *task) needGrow() (bool, int) { if int(t.curGo) > t.max { - return false + return false, 0 + } + + curTask := atomic.LoadInt64(&t.curTask) + curGo := atomic.LoadInt64(&t.curGo) + avg := t.windows.avg() + need := (float64(curTask)/float64(curGo)) > 0.8 && curGo > int64(avg) + + if need { + + if avg*2 < 8 { + return true, 16 + } + + if avg*2 < 1024 { + return true, int(avg * 2) + } + + return true, int(float64(t.curGo) * 1.25) + } + + return false, 0 +} + +func (t *task) needShrink() (bool, int) { + // 小于最小值直接忽略收缩 + if int(t.curGo) <= t.min { + return false, 0 } curTask := atomic.LoadInt64(&t.curTask) curGo := atomic.LoadInt64(&t.curGo) - return (float64(curTask) / float64(curGo)) > 0.8 + + need := (float64(curTask)/float64(curGo)) < 0.25 && curGo < int64(t.windows.avg()) + return need, int(float64(t.curGo) * 0.75) } // 管理go程 func (t *task) manageGo() { - for { - time.Sleep(time.Second * 5) - curTask := atomic.LoadInt64(&t.curTask) + for { + time.Sleep(time.Second * 1) + // 当前运行的go程数 curGo := atomic.LoadInt64(&t.curGo) + // 记录当前运行的任务数 + t.windows.add(curGo) - if curTask < int64(t.min) && curGo > int64(t.min) { - t.cancelGo() - } else if t.needResize() { - t.addGo() + // 1分钟内不考虑收缩go程 + if need, shrinkSize := t.needShrink(); need { + t.cancelGoNum(shrinkSize) + } else if need, newSize := t.needGrow(); need { + t.addGoNum(newSize - int(curGo)) } } + } // 运行任务 diff --git a/task_business_go.go b/task_business_go.go new file mode 100644 index 0000000..b18d1b6 --- /dev/null +++ b/task_business_go.go @@ -0,0 +1,47 @@ +// Copyright 2021-2024 antlabs. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package greatws + +import "sync/atomic" + +type businessGo struct { + taskChan chan func() bool + // 被多少conn绑定 + bindConnCount int64 +} + +// 新增绑定的conn数 +func (b *businessGo) addBinConnCount(f func() bool) { + atomic.AddInt64(&b.bindConnCount, 1) +} + +// 减少绑定的conn数 +func (b *businessGo) subBinConnCount(f func() bool) { + atomic.AddInt64(&b.bindConnCount, -1) +} + +// 是否可以杀死这个go程 +func (b *businessGo) canKill() bool { + curConn := atomic.LoadInt64(&b.bindConnCount) + if curConn < 0 { + panic("current conn < 0") + } + return curConn == 0 +} + +func newBusinessGo() *businessGo { + return &businessGo{ + taskChan: make(chan func() bool, 3), + } +} diff --git a/task_io.go b/task_io.go index ba51dcd..1368ebd 100644 --- a/task_io.go +++ b/task_io.go @@ -1,4 +1,4 @@ -// Copyright 2021-2023 antlabs. All rights reserved. +// Copyright 2023-2024 antlabs. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -15,6 +15,6 @@ package greatws type taskIo struct{} -func (t *taskIo) addTask(f func() bool) { +func (t *taskIo) addTask(ts taskStrategy, f func() bool) { f() } diff --git a/task_stream.go b/task_stream.go new file mode 100644 index 0000000..27ab727 --- /dev/null +++ b/task_stream.go @@ -0,0 +1,58 @@ +// Copyright 2023-2024 antlabs. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package greatws + +import ( + "sync" + "sync/atomic" +) + +type taskStream struct { + streamChan chan func() bool + sync.Once + closed uint32 +} + +func (t *taskStream) loop() { + for cb := range t.streamChan { + cb() + } +} +func (t *taskStream) init() { + t.streamChan = make(chan func() bool, 3) + go t.loop() +} + +func (t *taskStream) addTask(ts taskStrategy, f func() bool) { + if atomic.LoadUint32(&t.closed) == 1 { + return + } + + defer func() { + if err := recover(); err != nil { + + } + }() + t.streamChan <- f + // TODO阻塞的情况如何处理? + // 默认启动oneshot模式 +} + +func (t *taskStream) close() { + t.Do(func() { + close(t.streamChan) + atomic.StoreUint32(&t.closed, 1) + }) +} diff --git a/task_window.go b/task_window.go new file mode 100644 index 0000000..c268af3 --- /dev/null +++ b/task_window.go @@ -0,0 +1,40 @@ +// Copyright 2021-2024 antlabs. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package greatws + +// 滑动窗口记录历史go程数 +type windows struct { + // 历史go程数 + historyGo []int64 + sum int64 + w int64 +} + +func (w *windows) init() { + w.historyGo = make([]int64, 10) +} + +func (w *windows) add(goNum int64) { + if len(w.historyGo) == 0 { + return + } + + w.sum += goNum - w.historyGo[w.w] + w.historyGo[w.w] = goNum + w.w = (w.w + 1) % int64(len(w.historyGo)) +} + +func (w *windows) avg() float64 { + return float64(w.sum) / float64(len(w.historyGo)) +} diff --git a/task_window_test.go b/task_window_test.go new file mode 100644 index 0000000..05b48c3 --- /dev/null +++ b/task_window_test.go @@ -0,0 +1,36 @@ +// Copyright 2021-2024 antlabs. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package greatws + +import ( + "slices" + "testing" +) + +func Test_Windows(t *testing.T) { + t.Run("test_windows.0", func(t *testing.T) { + var w windows + w.init() + w.add(1) + if !slices.Equal(w.historyGo, []int64{1, 0, 0}) { + // 报错 + t.Errorf("w.historyGo is fail") + } + + if w.avg() > 0.5 { + t.Errorf("w.avg > 0.5") + } + }) +} diff --git a/tasker.go b/tasker.go index 4c96422..168875c 100644 --- a/tasker.go +++ b/tasker.go @@ -1,4 +1,4 @@ -// Copyright 2021-2023 antlabs. All rights reserved. +// Copyright 2023-2024 antlabs. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -15,5 +15,5 @@ package greatws type Tasker interface { - addTask(f func() bool) + addTask(fd int, ts taskStrategy, f func() bool) } diff --git a/upgrade.go b/upgrade.go index 9c02dc3..6cda06e 100644 --- a/upgrade.go +++ b/upgrade.go @@ -1,4 +1,4 @@ -// Copyright 2021-2023 antlabs. All rights reserved. +// Copyright 2023-2024 antlabs. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -136,6 +136,7 @@ func upgradeInner(w http.ResponseWriter, r *http.Request, conf *Config) (c *Conn } c = newConn(int64(fd), false, conf) + conf.Callback.OnOpen(c) if err = conf.multiEventLoop.add(c); err != nil { return nil, err } diff --git a/utils.go b/utils.go index 314a17f..b4e7bbc 100644 --- a/utils.go +++ b/utils.go @@ -1,4 +1,4 @@ -// Copyright 2021-2023 antlabs. All rights reserved. +// Copyright 2023-2024 antlabs. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License.