Skip to content

Commit

Permalink
fix(sfu): remove race condition for session & peer (#544)
Browse files Browse the repository at this point in the history
* fix(sfu): race condition for session & peer

* fix(sfu): mutex protect subscriber DataChannel
  • Loading branch information
hn8 authored Jul 17, 2021
1 parent 62bac79 commit ae24cc7
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 22 deletions.
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,6 @@ github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.3 h1:CE8S1cTafDpPvMhIxNJKvHsGVBgn1xWYf1NbHQhywc8=
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
Expand Down Expand Up @@ -379,7 +378,6 @@ github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.6.0 h1:UBcNElsrwanuuMsnGSlYmtmgbb23qDR5dG+6X6Oo89I=
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
Expand Down
7 changes: 3 additions & 4 deletions pkg/sfu/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,11 @@ const (

type atomicBool int32

func (a *atomicBool) set(value bool) {
var i int32
func (a *atomicBool) set(value bool) (swapped bool) {
if value {
i = 1
return atomic.SwapInt32((*int32)(a), 1) == 0
}
atomic.StoreInt32((*int32)(a), i)
return atomic.SwapInt32((*int32)(a), 0) == 1
}

func (a *atomicBool) get() bool {
Expand Down
4 changes: 3 additions & 1 deletion pkg/sfu/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,9 @@ func (p *PeerLocal) Close() error {
p.Lock()
defer p.Unlock()

p.closed.set(true)
if !p.closed.set(true) {
return nil
}

if p.session != nil {
p.session.RemovePeer(p)
Expand Down
3 changes: 1 addition & 2 deletions pkg/sfu/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,9 +209,8 @@ func (p *Publisher) Relay(ice []webrtc.ICEServer) (*relay.Peer, error) {
}
}
p.relayPeer = append(p.relayPeer, rp)

go p.relayReports(rp)
p.mu.Unlock()
go p.relayReports(rp)
})

if err = rp.Offer(p.cfg.Relay); err != nil {
Expand Down
34 changes: 24 additions & 10 deletions pkg/sfu/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,18 @@ func (s *SessionLocal) AddPeer(peer Peer) {

// RemovePeer removes a transport from the SessionLocal
func (s *SessionLocal) RemovePeer(p Peer) {
pid := p.ID()
Logger.V(0).Info("RemovePeer from SessionLocal", "peer_id", pid, "session_id", s.id)
s.mu.Lock()
Logger.V(0).Info("RemovePeer from SessionLocal", "peer_id", p.ID(), "session_id", s.id)
delete(s.peers, p.ID())
if s.peers[pid] == p {
delete(s.peers, pid)
}
peerCount := len(s.peers)
s.mu.Unlock()

// Close SessionLocal if no peers
if len(s.peers) == 0 && s.onCloseHandler != nil && !s.closed.get() {
s.closed.set(true)
s.onCloseHandler()
if peerCount == 0 {
s.Close()
}
}

Expand All @@ -101,15 +104,16 @@ func (s *SessionLocal) AddDatachannel(owner string, dc *webrtc.DataChannel) {

s.mu.Lock()
s.fanOutDCs = append(s.fanOutDCs, label)
s.peers[owner].Subscriber().channels[label] = dc
peerOwner := s.peers[owner]
peers := make([]Peer, 0, len(s.peers))
for _, p := range s.peers {
if p.ID() == owner || p.Subscriber() == nil {
if p == peerOwner || p.Subscriber() == nil {
continue
}
peers = append(peers, p)
}
s.mu.Unlock()
peerOwner.Subscriber().RegisterDatachannel(label, dc)

dc.OnMessage(func(msg webrtc.DataChannelMessage) {
s.onMessage(owner, label, msg)
Expand Down Expand Up @@ -207,6 +211,15 @@ func (s *SessionLocal) OnClose(f func()) {
s.onCloseHandler = f
}

func (s *SessionLocal) Close() {
if !s.closed.set(true) {
return
}
if s.onCloseHandler != nil {
s.onCloseHandler()
}
}

func (s *SessionLocal) setRelayedDatachannel(peerID string, datachannel *webrtc.DataChannel) {
label := datachannel.Label()
for _, dc := range s.datachannels {
Expand Down Expand Up @@ -292,17 +305,18 @@ func (s *SessionLocal) onMessage(origin, label string, msg webrtc.DataChannelMes
}
}

func (s *SessionLocal) GetDataChannels(origin, label string) (dcs []*webrtc.DataChannel) {
func (s *SessionLocal) GetDataChannels(origin, label string) []*webrtc.DataChannel {
s.mu.RLock()
defer s.mu.RUnlock()
dcs := make([]*webrtc.DataChannel, 0, len(s.peers))
for pid, p := range s.peers {
if origin == pid || p.Subscriber() == nil {
continue
}

if dc, ok := p.Subscriber().channels[label]; ok && dc.ReadyState() == webrtc.DataChannelStateOpen {
if dc := p.Subscriber().DataChannel(label); dc != nil && dc.ReadyState() == webrtc.DataChannelStateOpen {
dcs = append(dcs, dc)
}
}
return
return dcs
}
8 changes: 6 additions & 2 deletions pkg/sfu/sfu.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,12 @@ func (s *SFU) NewDatachannel(label string) *Datachannel {
}

// GetSessions return all sessions
func (s *SFU) GetSessions() map[string]Session {
func (s *SFU) GetSessions() []Session {
s.RLock()
defer s.RUnlock()
return s.sessions
sessions := make([]Session, 0, len(s.sessions))
for _, session := range s.sessions {
sessions = append(sessions, session)
}
return sessions
}
6 changes: 5 additions & 1 deletion pkg/sfu/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ func (s *Subscriber) AddDatachannel(peer Peer, dc *Datachannel) error {

// DataChannel returns the channel for a label
func (s *Subscriber) DataChannel(label string) *webrtc.DataChannel {
s.RLock()
defer s.RUnlock()
return s.channels[label]
}

Expand Down Expand Up @@ -204,11 +206,13 @@ func (s *Subscriber) SetRemoteDescription(desc webrtc.SessionDescription) error
}

func (s *Subscriber) RegisterDatachannel(label string, dc *webrtc.DataChannel) {
s.Lock()
s.channels[label] = dc
s.Unlock()
}

func (s *Subscriber) GetDatachannel(label string) *webrtc.DataChannel {
return s.channels[label]
return s.DataChannel(label)
}

func (s *Subscriber) GetDownTracks(streamID string) []*DownTrack {
Expand Down

0 comments on commit ae24cc7

Please sign in to comment.