Skip to content

Commit

Permalink
* add p2p multi-processes mode
Browse files Browse the repository at this point in the history
* reduce compilation time
* reduce the number of threads in process mode
  • Loading branch information
vyloy committed Dec 26, 2023
1 parent 7bd656c commit fcd5c00
Show file tree
Hide file tree
Showing 22 changed files with 4,507 additions and 22 deletions.
6 changes: 4 additions & 2 deletions .github/workflows/container.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,16 @@ jobs:

- name: package-server
run: |-
sudo apt-get update && sudo apt-get install make git gn ninja-build python3 python3-pip libgtk-3-dev gcc-aarch64-linux-gnu g++-aarch64-linux-gnu gcc-x86-64-linux-gnu g++-x86-64-linux-gnu -y
sudo apt-get update && sudo apt-get install make git gn ninja-build python3 python3-pip libgtk-3-dev gcc-aarch64-linux-gnu g++-aarch64-linux-gnu gcc-x86-64-linux-gnu g++-x86-64-linux-gnu rustc -y
rustup target add x86_64-unknown-linux-gnu && rustup target add aarch64-unknown-linux-gnu
GOARCH=amd64 make release_server && mv release/linux-amd64-server release/linux-x86_64-server
TARGET=aarch64-linux-gnu GOOS=linux GOARCH=arm64 make release_server && mv release/linux-arm64-server release/linux-aarch64-server
ls -al release
- name: package-client
run: |-
sudo apt-get update && sudo apt-get install make git gn ninja-build python3 python3-pip libgtk-3-dev gcc-aarch64-linux-gnu g++-aarch64-linux-gnu gcc-x86-64-linux-gnu g++-x86-64-linux-gnu -y
sudo apt-get update && sudo apt-get install make git gn ninja-build python3 python3-pip libgtk-3-dev gcc-aarch64-linux-gnu g++-aarch64-linux-gnu gcc-x86-64-linux-gnu g++-x86-64-linux-gnu rustc -y
rustup target add x86_64-unknown-linux-gnu && rustup target add aarch64-unknown-linux-gnu
GOARCH=amd64 make release_client && mv release/linux-amd64-client release/linux-x86_64-client
TARGET=aarch64-linux-gnu GOOS=linux GOARCH=arm64 make release_client && mv release/linux-arm64-client release/linux-aarch64-client
ls -al release
Expand Down
2 changes: 2 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
url = https://git.mirror.iscas.ac.cn/ao-space/google-webrtc.git
branch = main
shallow = true
ignore = dirty
[submodule "dep/_msquic"]
path = dep/_msquic
url = https://github.com/microsoft/msquic
ignore = dirty
6 changes: 6 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,9 @@ RUN sed -i 's/egrep -q "i686|x86_64"/true/g' /root/build/install-build-deps.sh &
rm -rf /root/build

RUN cp -r /usr/x86_64-linux-gnu/lib/* /usr/lib/x86_64-linux-gnu/

RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y && \
rustup target add x86_64-unknown-linux-musl && \
rustup target add aarch64-unknown-linux-musl && \
rustup target add x86_64-unknown-linux-gnu && \
rustup target add aarch64-unknown-linux-gnu
18 changes: 13 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ TARGET_OS=$(shell echo $(TARGET) | awk -F '-' '{print $$2}')
ifeq ($(TARGET_OS), native)
TARGET_OS=
endif
ifeq ($(TARGET_OS), linux)
RUST_TARGET?=$(shell echo $(TARGET) | awk -F '-' '{print $$1 "-unknown-" $$2 "-" $$3}')
endif
TARGET_CPU=$(shell echo $(TARGET) | awk -F '-' '{print $$1}')
ifeq ($(TARGET_CPU), native)
TARGET_CPU=
Expand All @@ -64,6 +67,7 @@ export CGO_CXXFLAGS=-I$(PWD)/dep/_google-webrtc/src \
-std=c++17 -DWEBRTC_POSIX -DQUIC_API_ENABLE_PREVIEW_FEATURES
export CGO_LDFLAGS= $(PWD)/dep/_google-webrtc/src/out/release-$(TARGET)/obj/libwebrtc.a \
$(PWD)/dep/_msquic/$(TARGET)/bin/Release/libmsquic.a \
$(PWD)/dep/p2p/target/$(RUST_TARGET)/release/libp2p.a \
-ldl -pthread
export CGO_ENABLED=1

Expand Down Expand Up @@ -168,19 +172,19 @@ docker_release_linux_arm64_server: docker_create_image

build: build_server build_client
release: release_server release_client
build_client: $(SOURCES) Makefile compile_msquic compile_webrtc
build_client: $(SOURCES) Makefile compile_msquic compile_webrtc compile_p2p
$(eval CGO_CXXFLAGS+=-O0 -g -ggdb)
$(eval NAME=$(GOOS)-$(GOARCH)-client)
go build $(DEBUG_OPTIONS) -o build/$(NAME)$(EXE) ./cmd/client
release_client: $(SOURCES) Makefile compile_msquic compile_webrtc release_web_client
release_client: $(SOURCES) Makefile compile_msquic compile_webrtc compile_p2p release_web_client
$(eval CGO_CXXFLAGS+=-O3)
$(eval NAME=$(GOOS)-$(GOARCH)-client)
go build $(RELEASE_OPTIONS) -o release/$(NAME)$(EXE) ./cmd/client
build_server: $(SOURCES) Makefile compile_msquic compile_webrtc
build_server: $(SOURCES) Makefile compile_msquic compile_webrtc compile_p2p
$(eval CGO_CXXFLAGS+=-O0 -g -ggdb)
$(eval NAME=$(GOOS)-$(GOARCH)-server)
go build $(DEBUG_OPTIONS) -o build/$(NAME)$(EXE) ./cmd/server
release_server: $(SOURCES) Makefile compile_msquic compile_webrtc release_web_server
release_server: $(SOURCES) Makefile compile_msquic compile_webrtc compile_p2p release_web_server
$(eval CGO_CXXFLAGS+=-O3)
$(eval NAME=$(GOOS)-$(GOARCH)-server)
go build $(RELEASE_OPTIONS) -o release/$(NAME)$(EXE) ./cmd/server
Expand Down Expand Up @@ -277,4 +281,8 @@ compile_msquic: check_msquic_dependencies update_submodule
cmake -B./dep/_msquic/$(TARGET) -S./dep/_msquic -DQUIC_BUILD_SHARED=OFF -DCMAKE_TARGET_ARCHITECTURE=$(TARGET_CPU)
make -C./dep/_msquic/$(TARGET) -j$(shell nproc)
@renameSymbols=$$(objdump -t ./dep/_msquic/$(TARGET)/bin/Release/libmsquic.a | awk -v RS= '/_YB80VJ/{next}1' | grep -E 'g +(F|O) ' | grep -Evi ' (ms){0,1}quic' | awk '{print " --redefine-sym " $$NF "=" $$NF "_YB80VJ"}') && \
$(TARGET)-objcopy $$renameSymbols ./dep/_msquic/$(TARGET)/bin/Release/libmsquic.a
$(TARGET)-objcopy $$renameSymbols ./dep/_msquic/$(TARGET)/bin/Release/libmsquic.a

compile_p2p:
cd ./dep/p2p && \
cargo build -r --target $(RUST_TARGET)
2 changes: 1 addition & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func New(args []string, out io.Writer) (c *Client, err error) {
c = &Client{
Logger: l,
tunnels: make(map[*conn]struct{}),
peers: make(map[uint32]*peerTask),
peers: make(map[uint32]PeerTask),
}
c.config.Store(&conf)
c.tunnelsCond = sync.NewCond(c.tunnelsRWMtx.RLocker())
Expand Down
6 changes: 5 additions & 1 deletion client/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"net/url"
"strconv"
"strings"
"sync/atomic"
"time"

"github.com/isrc-cas/gt/config"
Expand Down Expand Up @@ -66,10 +67,11 @@ type Options struct {
SentryDebug bool `yaml:"sentryDebug,omitempty" json:",omitempty" usage:"Sentry debug mode, the debug information is printed to help you understand what sentry is doing"`

WebRTCConnectionIdleTimeout config.Duration `yaml:"webrtcConnectionIdleTimeout,omitempty" usage:"The timeout of WebRTC connection. Supports values like '30s', '5m'"`
WebRTCRemoteConnections uint `yaml:"webrtcConnections" usage:"The max number of webrtc connections. Valid value is 1 to 50"`
WebRTCRemoteConnections uint `yaml:"webrtcConnections" usage:"The max number of webrtc connections. Valid value is 1 to 50"`
WebRTCLogLevel string `yaml:"webrtcLogLevel,omitempty" json:",omitempty" usage:"WebRTC log level: verbose, info, warning, error"`
WebRTCMinPort uint16 `yaml:"webrtcMinPort,omitempty" json:",omitempty" usage:"The min port of WebRTC peer connection"`
WebRTCMaxPort uint16 `yaml:"webrtcMaxPort,omitempty" json:",omitempty" usage:"The max port of WebRTC peer connection"`
WebRTCThread bool `yaml:"webrtcThreadMode,omitempty" json:",omitempty" usage:"Use thread mode of WebRTC peer connection"`

TCPForwardAddr string `yaml:"tcpForwardAddr,omitempty" json:",omitempty" usage:"The address of TCP forward"`
TCPForwardHostPrefix string `yaml:"tcpForwardHostPrefix,omitempty" json:",omitempty" usage:"The host prefix of TCP forward"`
Expand Down Expand Up @@ -168,6 +170,8 @@ type service struct {
LocalURL clientURL `yaml:"local,omitempty" json:",omitempty"`
LocalTimeout config.Duration `yaml:"localTimeout,omitempty" json:",omitempty"`
UseLocalAsHTTPHost bool `yaml:"useLocalAsHTTPHost,omitempty" json:",omitempty"`

remoteTCPPort atomic.Uint32 `yaml:"-" json:"-"`
}

func (s *service) String() string {
Expand Down
62 changes: 54 additions & 8 deletions client/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,9 +461,10 @@ func (c *conn) processData(taskID uint32, r *bufio.LimitedReader) (readErr, writ
})
return
}
_, err := r.WriteTo(pt.apiConn.PipeWriter)
_, err := r.WriteTo(pt.APIWriter())
if err != nil {
pt.Logger.Error().Err(err).Msg("processP2P WriteTo failed")
c.Logger.Info().
Uint32("peerTask", taskID).Msg("got closed because task with same id is received")
}
return
}
Expand Down Expand Up @@ -494,17 +495,61 @@ func (c *conn) processData(taskID uint32, r *bufio.LimitedReader) (readErr, writ
}

func (c *conn) processP2P(id uint32, r *bufio.LimitedReader) {
t, ok := c.newPeerTask(id)
var t PeerTask
var ok bool
if c.client.Config().WebRTCThread {
t, ok = c.newPeerTask(id)
} else {
t, ok = c.newPeerProcTask(id)
}
if !ok {
return
}

c.client.apiServer.Listener.AcceptCh() <- t.apiConn
t.Logger.Info().Msg("peer task started")
_, err := r.WriteTo(t.apiConn.PipeWriter)
c.client.apiServer.Listener.AcceptCh() <- t.APIConn()
c.Logger.Info().
Uint32("peerTask", id).Msg("peer task started")
_, err := r.WriteTo(t.APIWriter())
if err != nil {
t.Logger.Error().Err(err).Msg("processP2P WriteTo failed")
c.Logger.Error().
Uint32("peerTask", id).Err(err).Msg("processP2P WriteTo failed")
}
}

func (c *conn) newPeerProcTask(id uint32) (t *peerProcessTask, ok bool) {
c.client.peersRWMtx.Lock()
defer c.client.peersRWMtx.Unlock()
if !predef.Debug {
l := uint(len(c.client.peers))
if l >= c.client.Config().WebRTCRemoteConnections {
respAndClose(id, c, [][]byte{
[]byte("HTTP/1.1 403 Forbidden\r\nConnection: Closed\r\n\r\n"),
})
return
}
}

t = &peerProcessTask{}
t.id = id
t.tunnel = c
t.apiConn = api.NewConn(id, "", c)
t.apiConn.ProcessOffer = t.processOffer
t.apiConn.GetOffer = t.getOffer
t.apiConn.ProcessAnswer = t.processAnswer
t.data = pool.BytesPool.Get().([]byte)
t.Logger = c.Logger.With().
Uint32("peerTask", id).
Logger()

ot, ok := c.client.peers[id]
if ok && ot != nil {
ot.CloseWithLock()
c.Logger.Info().
Uint32("peerTask", id).Msg("got closed because task with same id is received")
}
c.client.peers[id] = t
ok = true
return
}

func (c *conn) newPeerTask(id uint32) (t *peerTask, ok bool) {
Expand Down Expand Up @@ -542,7 +587,8 @@ func (c *conn) newPeerTask(id uint32) (t *peerTask, ok bool) {
ot, ok := c.client.peers[id]
if ok && ot != nil {
ot.CloseWithLock()
ot.Logger.Info().Msg("got closed because task with same id is received")
c.Logger.Info().
Uint32("peerTask", id).Msg("got closed because task with same id is received")
}
c.client.peers[id] = t
ok = true
Expand Down
2 changes: 1 addition & 1 deletion client/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type Client struct {
closing uint32
tunnels map[*conn]struct{}
tunnelsRWMtx sync.RWMutex
peers map[uint32]*peerTask
peers map[uint32]PeerTask
peersRWMtx sync.RWMutex
tunnelsCond *sync.Cond
idleManager *idleManager
Expand Down
10 changes: 9 additions & 1 deletion client/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ type peerTask struct {
waitNegotiationNeeded chan struct{}
}

func (pt *peerTask) APIConn() *api.Conn {
return pt.apiConn
}

func (pt *peerTask) APIWriter() *std.PipeWriter {
return pt.apiConn.PipeWriter
}

func (pt *peerTask) OnSignalingChange(state webrtc.SignalingState) {
pt.Logger.Info().Str("state", state.String()).Msg("signaling state changed")
}
Expand Down Expand Up @@ -492,7 +500,7 @@ func (pt *peerTask) processAnswer(r *http.Request, writer http.ResponseWriter) {
err = errors.New("invalid task id")
return
}
task = pt
task = pt.(*peerTask)
}
task.process(r.Body, writer, func() (err error) {
var answer webrtc.SessionDescription
Expand Down
Loading

0 comments on commit fcd5c00

Please sign in to comment.