Skip to content

Commit

Permalink
Merge pull request #14 from antlabs/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
guonaihong authored Jan 23, 2024
2 parents a0a462a + 8ab5826 commit e5aab12
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 52 deletions.
40 changes: 40 additions & 0 deletions .github/workflows/autobahn.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
name: Autobahn

on:
push:
branches:
- master
- dev
pull_request:
branches:
- master
- dev

jobs:
Autobahn:
strategy:
matrix:
os: [ ubuntu-latest ]
go: [ 1.18.x ]
runs-on: ${{ matrix.os }}
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Setup Go
uses: actions/setup-go@v2
with:
go-version: ${{ matrix.go }}
- name: Autobahn Test
env:
CRYPTOGRAPHY_ALLOW_OPENSSL_102: yes
run: |
chmod +x ./autobahn/script/run.sh & ./autobahn/script/run.sh
- name: Autobahn Report Artifact
if: >-
startsWith(matrix.os, 'ubuntu')
uses: actions/upload-artifact@v2

with:
name: autobahn report ${{ matrix.go }} ${{ matrix.os }}
path: autobahn/report
retention-days: 7
51 changes: 51 additions & 0 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
name: Go

on:
push:
pull_request:

jobs:

build:
runs-on: ubuntu-latest
strategy:
matrix:
go: [ '1.20']
name: Go ${{ matrix.go }} sample

steps:

- name: Set up Go 1.20
uses: actions/setup-go@v1
with:
go-version: 1.20
id: go

- name: Check out code into the Go module directory
uses: actions/checkout@v1

- name: Get dependencies
run: |
go get -v -t -d ./...
if [ -f Gopkg.toml ]; then
curl https://raw.githubusercontent.com/golang/dep/master/install.sh | sh
dep ensure
fi
- name: Build
run: go build -v .

# - name: Test-386
# run: env GOARCH=386 go test -test.run=Test_Retry_sleep -v
#run: env GOARCH=386 go test -v -coverprofile='coverage.out' -covermode=count ./...

- name: Test-Race
run: env GOARCH=amd64 go test -v -race ./...

- name: Test-amd64
run: env GOARCH=amd64 go test -v -coverprofile='coverage.out' -covermode=count ./...

- name: Upload Coverage report
uses: codecov/codecov-action@v1
with:
token: ${{secrets.CODECOV_TOKEN}}
file: ./coverage.out
10 changes: 2 additions & 8 deletions api_epoll.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,26 +25,20 @@ import (
"golang.org/x/sys/unix"
)

const (

// EPOLLET .
EPOLLET = 0x80000000
)

const (
// 垂直触发
// 来自man 手册
// When used as an edge-triggered interface, for performance reasons,
// it is possible to add the file descriptor inside the epoll interface (EPOLL_CTL_ADD) once by specifying (EPOLLIN|EPOLLOUT).
// This allows you to avoid con‐
// tinuously switching between EPOLLIN and EPOLLOUT calling epoll_ctl(2) with EPOLL_CTL_MOD.
etRead = uint32(unix.EPOLLERR | unix.EPOLLHUP | unix.EPOLLRDHUP | unix.EPOLLPRI | unix.EPOLLIN | unix.EPOLLOUT | EPOLLET)
etRead = uint32(unix.EPOLLERR | unix.EPOLLHUP | unix.EPOLLRDHUP | unix.EPOLLPRI | unix.EPOLLIN | unix.EPOLLOUT | unix.EPOLLET)
etWrite = uint32(0)
etDelWrite = uint32(0)
etResetRead = uint32(0)

// 一次性触发
etReadOneShot = uint32(unix.EPOLLERR | unix.EPOLLHUP | unix.EPOLLRDHUP | unix.EPOLLPRI | unix.EPOLLIN | unix.EPOLLOUT | EPOLLET | unix.EPOLLONESHOT)
etReadOneShot = uint32(unix.EPOLLERR | unix.EPOLLHUP | unix.EPOLLRDHUP | unix.EPOLLPRI | unix.EPOLLIN | unix.EPOLLOUT | unix.EPOLLET | unix.EPOLLONESHOT)
etWriteOneShot = uint32(etReadOneShot)
etDelWriteOneShot = uint32(0)
etResetReadOneShot = uint32(etReadOneShot)
Expand Down
60 changes: 21 additions & 39 deletions api_kqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,6 @@ func (e *EventLoop) apiCreate(flag evFlag) (err error) {
e.apiState = &state
e.apiState.events = make([]unix.Kevent_t, 1024)

_, err = unix.Kevent(state.kqfd, []unix.Kevent_t{{
Ident: 0,
Filter: unix.EVFILT_USER,
Flags: unix.EV_ADD | unix.EV_CLEAR,
}}, nil, nil)
return err
}

Expand All @@ -54,71 +49,58 @@ func (e *EventLoop) apiFree() {
}
}

// 在另外一个线程唤醒kqueue
func (e *EventLoop) trigger() (err error) {
_, err = unix.Kevent(e.apiState.kqfd, []unix.Kevent_t{{Ident: 0, Filter: unix.EVFILT_USER, Fflags: unix.NOTE_TRIGGER}}, nil, nil)
return err
}

// 新加读事件
func (e *EventLoop) addRead(c *Conn) error {
e.mu.Lock()
fd := c.getFd()
e.apiState.changes = append(e.apiState.changes, unix.Kevent_t{Ident: uint64(fd), Filter: unix.EVFILT_READ, Flags: unix.EV_ADD | unix.EV_CLEAR})
e.mu.Unlock()
return e.trigger()
if fd == -1 {
return nil
}

_, err := unix.Kevent(e.kqfd, []unix.Kevent_t{
{Ident: uint64(fd), Flags: unix.EV_ADD, Filter: unix.EVFILT_READ},
{Ident: uint64(fd), Flags: unix.EV_ADD, Filter: unix.EVFILT_WRITE},
}, nil, nil)
return err
}

func (e *EventLoop) delWrite(c *Conn) (err error) {
e.mu.Lock()
fd := c.getFd()
e.apiState.changes = append(e.apiState.changes, unix.Kevent_t{Ident: uint64(fd), Filter: unix.EVFILT_WRITE, Flags: unix.EV_DELETE | unix.EV_CLEAR})
e.mu.Unlock()
return e.trigger()
return nil
}

// 新加写事件
func (e *EventLoop) addWrite(c *Conn) error {
e.mu.Lock()
fd := c.getFd()
e.apiState.changes = append(e.apiState.changes, unix.Kevent_t{Ident: uint64(fd), Filter: unix.EVFILT_WRITE, Flags: unix.EV_ADD | unix.EV_CLEAR})
e.mu.Unlock()
return e.trigger()
return nil
}

func (e *EventLoop) apiPoll(tv time.Duration) (retVal int, err error) {
state := e.apiState

var changes []unix.Kevent_t
e.mu.Lock()
changes = e.apiState.changes
e.apiState.changes = nil
e.mu.Unlock()
var timeout *unix.Timespec
if tv >= 0 {
var timeout unix.Timespec
timeout.Sec = int64(tv / time.Second)
timeout.Nsec = int64(tv % time.Second)

retVal, err = unix.Kevent(state.kqfd, changes, state.events, &timeout)
} else {
retVal, err = unix.Kevent(state.kqfd, changes, state.events, nil)
var tempTimeout unix.Timespec
tempTimeout.Sec = int64(tv / time.Second)
tempTimeout.Nsec = int64(tv % time.Second)
timeout = &tempTimeout
}

retVal, err = unix.Kevent(state.kqfd, nil, state.events, timeout)
if err != nil {
if errors.Is(err, unix.EINTR) {
return 0, nil
}
return 0, err
}

// fmt.Printf("有新的事件发生 %d, err :%v\n", retVal, err)
if retVal > 0 {
for j := 0; j < retVal; j++ {
ev := &state.events[j]
fd := int(ev.Ident)
// fmt.Printf("fd :%d, filter :%x, flags :%x\n", fd, ev.Filter, ev.Flags)

conn := e.getConn(fd)
if conn == nil {

unix.Close(fd)
e.parent.Logger.Debug("conn is nil", "fd", fd)
continue
}

Expand Down
12 changes: 7 additions & 5 deletions conn_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,13 @@ func duplicateSocket(socketFD int) (int, error) {
}

func (c *Conn) closeInner(err error) {
fd := c.getFd()
c.getLogger().Debug("close conn", slog.Int64("fd", int64(fd)))
c.parent.del(c)
atomic.StoreInt64(&c.fd, -1)
atomic.StoreInt32(&c.closed, 1)
c.closeOnce.Do(func() {
fd := c.getFd()
c.getLogger().Debug("close conn", slog.Int64("fd", int64(fd)))
c.parent.del(c)
atomic.StoreInt64(&c.fd, -1)
atomic.StoreInt32(&c.closed, 1)
})
}

func (c *Conn) closeWithLock(err error) {
Expand Down

0 comments on commit e5aab12

Please sign in to comment.