Skip to content

Commit

Permalink
Merge pull request #15 from antlabs/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
guonaihong authored Jan 28, 2024
2 parents e5aab12 + 39b2640 commit a8db72b
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 69 deletions.
2 changes: 1 addition & 1 deletion api_epoll.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (e *epollState) apiPoll(tv time.Duration) (retVal int, err error) {
e.parent.parent.addReadEvNum()

// 读取数据,这里要发行下websocket的解析变成流式解析
_, err = conn.processWebsocketFrame()
err = conn.processWebsocketFrame()
if err != nil {
conn.closeWithLock(err)
}
Expand Down
2 changes: 1 addition & 1 deletion api_kqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (e *EventLoop) apiPoll(tv time.Duration) (retVal int, err error) {
continue
}
// 读取数据,这里要发行下websocket的解析变成流式解析
_, err = conn.processWebsocketFrame()
err = conn.processWebsocketFrame()
if err != nil {
conn.closeWithLock(err)
continue
Expand Down
3 changes: 2 additions & 1 deletion conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ const (
type conn struct {
fd int64 // 文件描述符fd
rbuf *[]byte // 读缓冲区
lastPayloadLen int64 // 上一次读取的payload长度
rr int // rbuf读索引
rw int // rbuf写索引
curState frameState // 保存当前状态机的状态
Expand Down Expand Up @@ -292,7 +293,7 @@ func (c *Conn) readPayload() (f frame.Frame2, success bool, err error) {
if needRead > 0 {
return
}

c.lastPayloadLen = c.rh.PayloadLen
// 普通frame
newBuf := GetPayloadBytes(int(c.rh.PayloadLen))
copy(*newBuf, (*c.rbuf)[c.rr:c.rr+int(c.rh.PayloadLen)])
Expand Down
146 changes: 81 additions & 65 deletions conn_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"unsafe"

"github.com/antlabs/wsutil/bytespool"
"github.com/antlabs/wsutil/enum"
"golang.org/x/sys/unix"
)

Expand Down Expand Up @@ -74,26 +75,21 @@ func (s writeState) String() string {
type Conn struct {
conn

// 存在io-uring相关的控制信息
// onlyIoUringState

wbuf *[]byte // 写缓冲区, 当直接Write失败时,会将数据写入缓冲区
mu sync.Mutex
client bool // 客户端为true,服务端为false
*Config // 配置
closed int32 // 是否关闭
closeOnce sync.Once
parent *EventLoop
wbuf *[]byte // 写缓冲区, 当直接Write失败时,会将数据写入缓冲区
mu sync.Mutex // 锁
client bool // 客户端为true,服务端为false
*Config // 配置
closed int32 // 是否关闭
closeOnce sync.Once // 关闭一次
parent *EventLoop // event loop
currBindGo *businessGo // 绑定模式下,当前绑定的go程
streamGo taskStream // stream模式下,当前绑定的go程
}

func newConn(fd int64, client bool, conf *Config) *Conn {
rbuf := bytespool.GetBytes(conf.initPayloadSize())
c := &Conn{
conn: conn{
fd: fd,
rbuf: rbuf,
fd: fd,
},
// 初始化不分配内存,只有在需要的时候才分配
Config: conf,
Expand Down Expand Up @@ -309,80 +305,100 @@ func (c *Conn) flushOrClose() (err error) {
// 有几种情况需要处理下
// 1. 缓冲区空间不句够,需要扩容
// 2. 缓冲区数据不够,并且一次性读取了多个frame
func (c *Conn) processWebsocketFrame() (n int, err error) {
func (c *Conn) processWebsocketFrame() (err error) {
// 1. 处理frame header
// if !c.useIoUring() {
if true {
// 不使用io_uring的直接调用read获取buffer数据
for i := 0; ; i++ {
fd := atomic.LoadInt64(&c.fd)
n, err = unix.Read(int(fd), (*c.rbuf)[c.rw:])
c.multiEventLoop.addReadSyscall()
// fmt.Printf("i = %d, n = %d, fd = %d, rbuf = %d, rw:%d, err = %v, %v, payload:%d\n", i, n, c.fd, len((*c.rbuf)[c.rw:]), c.rw+n, err, time.Now(), c.rh.PayloadLen)
if err != nil {
// 信号中断,继续读
if errors.Is(err, unix.EINTR) {
continue
}
// 出错返回
if !errors.Is(err, unix.EAGAIN) && !errors.Is(err, unix.EWOULDBLOCK) {
return 0, err
}
// 缓冲区没有数据,等待可读
err = nil
break
}
if c.rbuf == nil {
c.rbuf = bytespool.GetBytes(int(float32(c.rh.PayloadLen+enum.MaxFrameHeaderSize) * c.windowsMultipleTimesPayloadSize))
}

// 读到eof,直接关闭
if n == 0 && len((*c.rbuf)[c.rw:]) > 0 {
c.closeWithLock(io.EOF)
c.OnClose(c, io.EOF)
return
n := 0
var success bool
// 不使用io_uring的直接调用read获取buffer数据
for i := 0; ; i++ {
fd := atomic.LoadInt64(&c.fd)
n, err = unix.Read(int(fd), (*c.rbuf)[c.rw:])
c.multiEventLoop.addReadSyscall()
// fmt.Printf("i = %d, n = %d, fd = %d, rbuf = %d, rw:%d, err = %v, %v, payload:%d\n",
// i, n, c.fd, len((*c.rbuf)[c.rw:]), c.rw+n, err, time.Now(), c.rh.PayloadLen)
if err != nil {
// 信号中断,继续读
if errors.Is(err, unix.EINTR) {
continue
}
// 出错返回
if !errors.Is(err, unix.EAGAIN) && !errors.Is(err, unix.EWOULDBLOCK) {
goto fail
}
// 缓冲区没有数据,等待可读
err = nil
break
}

// 读到eof,直接关闭
if n == 0 && len((*c.rbuf)[c.rw:]) > 0 {
c.closeWithLock(io.EOF)
c.OnClose(c, io.EOF)
err = io.EOF
goto fail
}

if n > 0 {
c.rw += n
if n > 0 {
c.rw += n
}

if len((*c.rbuf)[c.rw:]) == 0 {
// 说明缓存区已经满了。需要扩容
// 并且如果使用epoll ET mode,需要继续读取,直到返回EAGAIN, 不然会丢失数据
// 结合以上两种,缓存区满了就直接处理frame,解析出payload的长度,得到一个刚刚好的缓存区
if _, err = c.readHeader(); err != nil {
err = fmt.Errorf("read header err: %w", err)
goto fail
}
if _, err = c.readPayloadAndCallback(); err != nil {
err = fmt.Errorf("read header err: %w", err)
goto fail
}

// TODO
if len((*c.rbuf)[c.rw:]) == 0 {
// 说明缓存区已经满了。需要扩容
// 并且如果使用epoll ET mode,需要继续读取,直到返回EAGAIN, 不然会丢失数据
// 结合以上两种,缓存区满了就直接处理frame,解析出payload的长度,得到一个刚刚好的缓存区
if _, err := c.readHeader(); err != nil {
return 0, fmt.Errorf("read header err: %w", err)
}
if _, err := c.readPayloadAndCallback(); err != nil {
return 0, fmt.Errorf("read header err: %w", err)
}

// TODO
if len((*c.rbuf)[c.rw:]) == 0 {
//
// panic(fmt.Sprintf("需要扩容:rw(%d):rr(%d):currState(%v)", c.rw, c.rr, c.curState.String()))
}
continue
//
// panic(fmt.Sprintf("需要扩容:rw(%d):rr(%d):currState(%v)", c.rw, c.rr, c.curState.String()))
}
continue
}
}

for i := 0; ; i++ {
sucess, err := c.readHeader()
success, err = c.readHeader()
if err != nil {
return 0, fmt.Errorf("read header err: %w", err)
err = fmt.Errorf("read header err: %w", err)
goto fail
}

if !sucess {
return 0, nil
if !success {
goto success
}
sucess, err = c.readPayloadAndCallback()
success, err = c.readPayloadAndCallback()
if err != nil {
return 0, fmt.Errorf("read header err: %w", err)
err = fmt.Errorf("read payload err: %w", err)
goto fail
}

if !sucess {
return 0, nil
if !success {
goto success
}
}

success:
fail:
// 回收read buffer至内存池中
if err != nil || c.rbuf != nil && c.rr == c.rw {
c.rr, c.rw = 0, 0
bytespool.PutBytes(c.rbuf)
c.rbuf = nil
}
return err
}

func closeFd(fd int) {
Expand Down
3 changes: 2 additions & 1 deletion task_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package greatws

import (
"runtime"
"sync"
"sync/atomic"
)
Expand All @@ -31,7 +32,7 @@ func (t *taskStream) loop() {
}
}
func (t *taskStream) init() {
t.streamChan = make(chan func() bool, 3)
t.streamChan = make(chan func() bool, runtime.NumCPU())
go t.loop()
}

Expand Down

0 comments on commit a8db72b

Please sign in to comment.