Skip to content

Commit

Permalink
Merge pull request #172 from pion/sfu_send
Browse files Browse the repository at this point in the history
SFU serial send

Former-commit-id: 718693c
  • Loading branch information
jbrady42 authored May 12, 2020
2 parents 8e1e11c + 751bc01 commit 710bcbe
Showing 1 changed file with 45 additions and 27 deletions.
72 changes: 45 additions & 27 deletions pkg/rtc/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type Router struct {
stop bool
liveTime time.Time
pluginChain *plugins.PluginChain
subChans map[string]chan *rtp.Packet
}

// NewRouter return a new Router
Expand All @@ -39,6 +40,7 @@ func NewRouter(id string) *Router {
subs: make(map[string]transport.Transport),
liveTime: time.Now().Add(liveCycle),
pluginChain: plugins.NewPluginChain(),
subChans: make(map[string]chan *rtp.Packet),
}
}

Expand Down Expand Up @@ -75,25 +77,17 @@ func (r *Router) start() {
continue
}
r.liveTime = time.Now().Add(liveCycle)
// nonblock sending
go func() {
for _, t := range r.GetSubs() {
if t == nil {
log.Errorf("Transport is nil")
continue
}

// log.Infof(" WriteRTP %v:%v to %v ", pkt.SSRC, pkt.SequenceNumber, t.ID())
if err := t.WriteRTP(pkt); err != nil {
// log.Errorf("wt.WriteRTP err=%v", err)
// del sub when err is increasing
if t.WriteErrTotal() > maxWriteErr {
r.DelSub(t.ID())
}
}
t.WriteErrReset()
r.subLock.RLock()
// Push to client send queues
for i, _ := range r.GetSubs() {
// Nonblock sending
select {
case r.subChans[i] <- pkt:
default:
log.Errorf("Sub consumer is backed up. Dropping packet")
}
}()
}
r.subLock.RUnlock()
}
}()
}
Expand Down Expand Up @@ -142,7 +136,25 @@ func (r *Router) AddSub(id string, t transport.Transport) transport.Transport {
r.subLock.Lock()
defer r.subLock.Unlock()
r.subs[id] = t
r.subChans[id] = make(chan *rtp.Packet, 1000)
log.Infof("Router.AddSub id=%s t=%p", id, t)
// Sub writer
go func() {
for pkt := range r.subChans[id] {
// log.Infof(" WriteRTP %v:%v to %v ", pkt.SSRC, pkt.SequenceNumber, t.ID())
if err := t.WriteRTP(pkt); err != nil {
// log.Errorf("wt.WriteRTP err=%v", err)
// del sub when err is increasing
if t.WriteErrTotal() > maxWriteErr {
r.DelSub(t.ID())
}
}
t.WriteErrReset()
}
log.Infof("Closing sub writer")
}()

// Sub reader
go func() {
for {
pkt := <-t.GetRTCPChan()
Expand Down Expand Up @@ -182,8 +194,8 @@ func (r *Router) AddSub(id string, t transport.Transport) transport.Transport {

// GetSub get a sub by id
func (r *Router) GetSub(id string) transport.Transport {
r.subLock.Lock()
defer r.subLock.Unlock()
r.subLock.RLock()
defer r.subLock.RUnlock()
// log.Infof("Router.GetSub id=%s sub=%v", id, r.subs[id])
return r.subs[id]
}
Expand Down Expand Up @@ -213,20 +225,26 @@ func (r *Router) DelSub(id string) {
if r.subs[id] != nil {
r.subs[id].Close()
}
if r.subChans[id] != nil {
close(r.subChans[id])
}
delete(r.subs, id)
delete(r.subChans, id)
}

// DelSubs del all sub
func (r *Router) DelSubs() {
log.Infof("Router.DelSubs")
r.subLock.Lock()
defer r.subLock.Unlock()
for _, sub := range r.subs {
if sub != nil {
sub.Close()
}
r.subLock.RLock()
keys := make([]string, 0, len(r.subs))
for k := range r.subs {
keys = append(keys, k)
}
r.subLock.RUnlock()

for _, id := range keys {
r.DelSub(id)
}
r.subs = nil
}

// Close release all
Expand Down

0 comments on commit 710bcbe

Please sign in to comment.