Skip to content

Commit

Permalink
Dev (#11)
Browse files Browse the repository at this point in the history
* 优化

* 滑动窗口

* 更新

* 拆分task为全局和本地

* 调整下event loop结构

* 重构两个变量名

* 更新

* 更新

* OnOpen

* 更新

* 更新

* 更新
  • Loading branch information
guonaihong authored Jan 8, 2024
1 parent afc60d4 commit 20143a7
Show file tree
Hide file tree
Showing 35 changed files with 777 additions and 278 deletions.
108 changes: 79 additions & 29 deletions api_epoll.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021-2023 antlabs. All rights reserved.
// Copyright 2023-2024 antlabs. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -31,12 +31,51 @@ const (
EPOLLET = 0x80000000
)

const (
// 垂直触发
// 来自man 手册
// When used as an edge-triggered interface, for performance reasons,
// it is possible to add the file descriptor inside the epoll interface (EPOLL_CTL_ADD) once by specifying (EPOLLIN|EPOLLOUT).
// This allows you to avoid con‐
// tinuously switching between EPOLLIN and EPOLLOUT calling epoll_ctl(2) with EPOLL_CTL_MOD.
etRead = uint32(unix.EPOLLERR | unix.EPOLLHUP | unix.EPOLLRDHUP | unix.EPOLLPRI | unix.EPOLLIN | unix.EPOLLOUT | EPOLLET)
etWrite = uint32(0)
etDelWrite = uint32(0)
etResetRead = uint32(0)

// 一次性触发
etReadOneShot = uint32(unix.EPOLLERR | unix.EPOLLHUP | unix.EPOLLRDHUP | unix.EPOLLPRI | unix.EPOLLIN | unix.EPOLLOUT | EPOLLET | unix.EPOLLONESHOT)
etWriteOneShot = uint32(etReadOneShot)
etDelWriteOneShot = uint32(0)
etResetReadOneShot = uint32(etReadOneShot)
)

type epollState struct {
epfd int
events []unix.EpollEvent

et bool
parent *EventLoop
et bool
parent *EventLoop
rev uint32
wev uint32
dwEv uint32 // delete write event
resetEv uint32
}

func getReadWriteDeleteReset(oneShot bool, et bool) (uint32, uint32, uint32, uint32) {
if oneShot {
return etReadOneShot, etWriteOneShot, etDelWriteOneShot, etResetReadOneShot
}

if et {
return etRead, etWrite, etDelWrite, etResetRead
}

// if lt {
// return ltRead, ltWrite, ltDelWrite, ltResetRead
// }

return 0, 0, 0, 0
}

// 创建epoll handler
Expand All @@ -49,7 +88,7 @@ func apiEpollCreate(parent *EventLoop) (la linuxApi, err error) {

e.events = make([]unix.EpollEvent, 128)
e.parent = parent
e.et = true
e.rev, e.wev, e.dwEv, e.resetEv = getReadWriteDeleteReset(false, true)
return &e, nil
}

Expand All @@ -60,42 +99,53 @@ func (e *epollState) apiFree() {

// 新加读事件
func (e *epollState) addRead(c *Conn) error {
fd := int(c.getFd())
return unix.EpollCtl(e.epfd, unix.EPOLL_CTL_ADD, fd, &unix.EpollEvent{
Fd: int32(fd),
// 来自man 手册
// When used as an edge-triggered interface, for performance reasons,
// it is possible to add the file descriptor inside the epoll interface (EPOLL_CTL_ADD) once by specifying (EPOLLIN|EPOLLOUT).
// This allows you to avoid con‐
// tinuously switching between EPOLLIN and EPOLLOUT calling epoll_ctl(2) with EPOLL_CTL_MOD.
Events: unix.EPOLLERR | unix.EPOLLHUP | unix.EPOLLRDHUP | unix.EPOLLPRI | unix.EPOLLIN | unix.EPOLLOUT | EPOLLET,
})
if e.rev > 0 {
fd := int(c.getFd())
return unix.EpollCtl(e.epfd, unix.EPOLL_CTL_ADD, fd, &unix.EpollEvent{
Fd: int32(fd),
Events: e.rev,
})
}
return nil
}

// 新加写事件
func (e *epollState) addWrite(c *Conn) error {
if e.et {
return nil
if e.wev > 0 {

fd := int(c.getFd())
return unix.EpollCtl(e.epfd, unix.EPOLL_CTL_MOD, fd, &unix.EpollEvent{
Fd: int32(fd),
Events: e.wev,
})
}

fd := int(c.getFd())
return unix.EpollCtl(e.epfd, unix.EPOLL_CTL_MOD, fd, &unix.EpollEvent{
Fd: int32(fd),
Events: unix.EPOLLERR | unix.EPOLLHUP | unix.EPOLLRDHUP | unix.EPOLLPRI | unix.EPOLLIN | unix.EPOLLOUT,
})
return nil
}

// 删除写事件
func (e *epollState) delWrite(c *Conn) error {
if e.et {
return nil
if e.dwEv > 0 {

fd := int(c.getFd())
return unix.EpollCtl(e.epfd, unix.EPOLL_CTL_MOD, fd, &unix.EpollEvent{
Fd: int32(fd),
Events: e.dwEv,
})
}
return nil
}

fd := int(c.getFd())
return unix.EpollCtl(e.epfd, unix.EPOLL_CTL_MOD, fd, &unix.EpollEvent{
Fd: int32(fd),
Events: unix.EPOLLERR | unix.EPOLLHUP | unix.EPOLLRDHUP | unix.EPOLLPRI | unix.EPOLLIN | EPOLLET,
})
// 重装添加读事件
func (e *epollState) resetRead(c *Conn) error {
fd := c.getFd()
if e.resetEv > 0 {
return unix.EpollCtl(e.epfd, unix.EPOLL_CTL_MOD, int(fd), &unix.EpollEvent{
Fd: int32(fd),
Events: e.resetEv,
})
}
return nil
}

// 删除事件
Expand Down Expand Up @@ -123,7 +173,7 @@ func (e *epollState) apiPoll(tv time.Duration) (retVal int, err error) {
numEvents = retVal
for i := 0; i < numEvents; i++ {
ev := &e.events[i]
conn := e.parent.parent.getConn(int(ev.Fd))
conn := e.parent.getConn(int(ev.Fd))
if conn == nil {
unix.Close(int(ev.Fd))
continue
Expand Down
2 changes: 1 addition & 1 deletion api_iouring.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (e *iouringState) getLogger() *slog.Logger {
}

func (e *iouringState) getConn(fd uint32) *Conn {
return e.parent.parent.getConn(int(fd))
return e.parent.getConn(int(fd))
}

// io-uring 处理事件的入口函数
Expand Down
54 changes: 27 additions & 27 deletions api_iouring_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,33 +10,33 @@ import (
)

func (c *Conn) processRead(cqe *giouring.CompletionQueueEvent) error {
// 返回值小于等于0,表示读取完毕,关闭连接
if cqe.Res <= 0 {
c.closeWithLock(io.EOF)
c.getLogger().Debug("read res <= 0", "res", cqe.Res, "fd", c.fd)
return nil
}

c.getLogger().Debug("read res", "res", cqe.Res, "fd", c.fd)

// 处理websocket数据
c.rw += int(cqe.Res)
_, err := c.processWebsocketFrameOnlyIoUring()
if err != nil {
c.getLogger().Error("processWebsocketFrameOnlyIoUring", "err", err)
return err
}

parent := c.getParent()
if parent == nil {
c.processClose(cqe)
c.getLogger().Info("parent is nil", "close", c.closed)
return nil
}

if err := parent.addRead(c); err != nil {
return err
}
// // 返回值小于等于0,表示读取完毕,关闭连接
// if cqe.Res <= 0 {
// c.closeWithLock(io.EOF)
// c.getLogger().Debug("read res <= 0", "res", cqe.Res, "fd", c.fd)
// return nil
// }

// c.getLogger().Debug("read res", "res", cqe.Res, "fd", c.fd)

// // 处理websocket数据
// c.rw += int(cqe.Res)
// _, err := c.processWebsocketFrameOnlyIoUring()
// if err != nil {
// c.getLogger().Error("processWebsocketFrameOnlyIoUring", "err", err)
// return err
// }

// parent := c.getParent()
// if parent == nil {
// c.processClose(cqe)
// c.getLogger().Info("parent is nil", "close", c.closed)
// return nil
// }

// if err := parent.addRead(c); err != nil {
// return err
// }

return nil
}
Expand Down
12 changes: 2 additions & 10 deletions api_kqueue.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021-2023 antlabs. All rights reserved.
// Copyright 2023-2024 antlabs. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -20,7 +20,6 @@ package greatws
import (
"errors"
"io"
"syscall"
"time"

"golang.org/x/sys/unix"
Expand Down Expand Up @@ -87,13 +86,6 @@ func (e *EventLoop) addWrite(c *Conn) error {
return e.trigger()
}

func (e *EventLoop) del(fd int) error {
e.mu.Lock()
e.apiState.changes = append(e.apiState.changes, unix.Kevent_t{Ident: uint64(fd), Flags: syscall.EV_DELETE, Filter: syscall.EVFILT_READ})
e.mu.Unlock()
return e.trigger()
}

func (e *EventLoop) apiPoll(tv time.Duration) (retVal int, err error) {
state := e.apiState

Expand Down Expand Up @@ -124,7 +116,7 @@ func (e *EventLoop) apiPoll(tv time.Duration) (retVal int, err error) {
ev := &state.events[j]
fd := int(ev.Ident)
// fmt.Printf("fd :%d, filter :%x, flags :%x\n", fd, ev.Filter, ev.Flags)
conn := e.parent.getConn(fd)
conn := e.getConn(fd)
if conn == nil {
unix.Close(fd)
continue
Expand Down
2 changes: 1 addition & 1 deletion api_linux.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2023-2023 antlabs. All rights reserved.
// Copyright 2023-2024 antlabs. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
51 changes: 51 additions & 0 deletions autobahn/autobahn-server.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type handler struct {
m *greatws.MultiEventLoop
}

// 运行在业务线程
func (h *handler) echo(w http.ResponseWriter, r *http.Request) {
opts := []greatws.ServerOption{
greatws.WithServerReplyPing(),
Expand All @@ -74,6 +75,54 @@ func (h *handler) echo(w http.ResponseWriter, r *http.Request) {
_ = c
}

// 运行在io线程
func (h *handler) echoRunInIo(w http.ResponseWriter, r *http.Request) {
opts := []greatws.ServerOption{
greatws.WithServerReplyPing(),
greatws.WithServerDecompression(),
greatws.WithServerIgnorePong(),
greatws.WithServerCallback(&echoHandler{}),
greatws.WithServerEnableUTF8Check(),
greatws.WithServerReadTimeout(5 * time.Second),
greatws.WithServerMultiEventLoop(h.m),
greatws.WithServerCallbackInEventLoop(),
}

if *runInEventLoop {
opts = append(opts, greatws.WithServerCallbackInEventLoop())
}

c, err := greatws.Upgrade(w, r, opts...)
if err != nil {
slog.Error("Upgrade fail:", "err", err.Error())
}
_ = c
}

// 使用stream模式运行, 一个websocket一个go程
func (h *handler) echoRunStream(w http.ResponseWriter, r *http.Request) {
opts := []greatws.ServerOption{
greatws.WithServerReplyPing(),
greatws.WithServerDecompression(),
greatws.WithServerIgnorePong(),
greatws.WithServerCallback(&echoHandler{}),
greatws.WithServerEnableUTF8Check(),
greatws.WithServerReadTimeout(5 * time.Second),
greatws.WithServerMultiEventLoop(h.m),
greatws.WithServerStreamMode(),
greatws.WithServerCallbackInEventLoop(),
}

if *runInEventLoop {
opts = append(opts, greatws.WithServerCallbackInEventLoop())
}

c, err := greatws.Upgrade(w, r, opts...)
if err != nil {
slog.Error("Upgrade fail:", "err", err.Error())
}
_ = c
}
func main() {
flag.Parse()

Expand All @@ -97,6 +146,8 @@ func main() {
}()
mux := &http.ServeMux{}
mux.HandleFunc("/autobahn", h.echo)
mux.HandleFunc("/autobahn-io", h.echoRunInIo)
mux.HandleFunc("/autobahn-stream", h.echoRunStream)

rawTCP, err := net.Listen("tcp", ":9001")
if err != nil {
Expand Down
16 changes: 15 additions & 1 deletion autobahn/config/fuzzingclient.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,25 @@
"outdir": "./report/",
"servers": [
{
"agent": "non-tls",
"agent": "non-tls-taskbind",
"url": "ws://127.0.0.1:9001/autobahn",
"options": {
"version": 18
}
},
{
"agent": "non-tls-io",
"url": "ws://127.0.0.1:9001/autobahn-io",
"options": {
"version": 18
}
},
{
"agent": "non-tls-stream",
"url": "ws://127.0.0.1:9001/autobahn-stream",
"options": {
"version": 18
}
}
],
"cases": [
Expand Down
2 changes: 1 addition & 1 deletion callback.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021-2023 antlabs. All rights reserved.
// Copyright 2023-2024 antlabs. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion client.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021-2023 antlabs. All rights reserved.
// Copyright 2023-2024 antlabs. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion client_options.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021-2023 antlabs. All rights reserved.
// Copyright 2023-2024 antlabs. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
Loading

0 comments on commit 20143a7

Please sign in to comment.