Skip to content

Commit

Permalink
[QBOX] Support TrackLocal RTX (pion#8)
Browse files Browse the repository at this point in the history
  • Loading branch information
aggresss authored Feb 29, 2024
1 parent 2828381 commit 62e4e71
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 8 deletions.
24 changes: 24 additions & 0 deletions rtpcodec.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
package webrtc

import (
"fmt"
"regexp"
"strings"

"github.com/pion/webrtc/v3/internal/fmtp"
Expand Down Expand Up @@ -121,3 +123,25 @@ func codecParametersFuzzySearch(needle RTPCodecParameters, haystack []RTPCodecPa

return RTPCodecParameters{}, codecMatchNone
}

// Do a fuzzy find for a associated codec in the list of codecs
// Used for lookup up a associated codec in an existing list to find a match
// Returns codecMatchExact, codecMatchPartial, or codecMatchNone
func codecParametersAssociatedSearch(needle RTPCodecParameters, haystack []RTPCodecParameters) (RTPCodecParameters, codecMatchType) {
// First attempt to match Exact
for _, c := range haystack {
if c.SDPFmtpLine == fmt.Sprintf("apt=%d", needle.PayloadType) {
return c, codecMatchExact
}
}

// Fallback to just has apt codec
re := regexp.MustCompile(`^apt=\d+$`)
for _, c := range haystack {
if re.MatchString(c.SDPFmtpLine) {
return c, codecMatchPartial
}
}

return RTPCodecParameters{}, codecMatchNone
}
95 changes: 89 additions & 6 deletions rtpsender.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,18 @@ import (
)

type trackEncoding struct {
track TrackLocal

srtpStream *srtpWriterFuture
track TrackLocal
context *baseTrackLocalContext

ssrc SSRC
srtpStream *srtpWriterFuture
rtcpInterceptor interceptor.RTCPReader
streamInfo interceptor.StreamInfo

context *baseTrackLocalContext

ssrc SSRC
rtxSsrc SSRC
rtxSrtpStream *srtpWriterFuture
rtxRtcpInterceptor interceptor.RTCPReader
rtxStreamInfo interceptor.StreamInfo
}

// RTPSender allows an application to control how a given Track is encoded and transmitted to a remote peer
Expand Down Expand Up @@ -122,6 +124,7 @@ func (r *RTPSender) getParameters() RTPSendParameters {
RID: rid,
SSRC: trackEncoding.ssrc,
PayloadType: r.payloadType,
RTX: RTPRtxParameters{SSRC: trackEncoding.rtxSsrc},
},
})
}
Expand Down Expand Up @@ -210,6 +213,16 @@ func (r *RTPSender) addEncoding(track TrackLocal) {
}),
)

if r.api.settingEngine.trackLocalRtx {
codecs := r.api.mediaEngine.getCodecsByKind(track.Kind())
for _, c := range codecs {
if _, matchType := codecParametersAssociatedSearch(c, codecs); matchType != codecMatchNone {
trackEncoding.rtxSsrc = SSRC(randutil.NewMathRandomGenerator().Uint32())
break
}
}
}

r.trackEncodings = append(r.trackEncodings, trackEncoding)
}

Expand Down Expand Up @@ -332,6 +345,37 @@ func (r *RTPSender) Send(parameters RTPSendParameters) error {
}),
)
writeStream.interceptor.Store(rtpInterceptor)

if rtxCodec, matchType := codecParametersAssociatedSearch(codec, r.api.mediaEngine.getCodecsByKind(r.kind)); matchType == codecMatchExact &&
parameters.Encodings[idx].RTX.SSRC != 0 {
rtxSrtpStream := &srtpWriterFuture{ssrc: parameters.Encodings[idx].RTX.SSRC, rtpSender: r}

trackEncoding.rtxSrtpStream = rtxSrtpStream
trackEncoding.rtxSsrc = parameters.Encodings[idx].RTX.SSRC

trackEncoding.rtxStreamInfo = *createStreamInfo(
r.id+"_rtx",
parameters.Encodings[idx].RTX.SSRC,
rtxCodec.PayloadType,
rtxCodec.RTPCodecCapability,
parameters.HeaderExtensions,
)
trackEncoding.rtxStreamInfo.Attributes.Set("apt_ssrc", uint32(parameters.Encodings[idx].SSRC))

trackEncoding.rtxRtcpInterceptor = r.api.interceptor.BindRTCPReader(
interceptor.RTCPReaderFunc(func(in []byte, a interceptor.Attributes) (n int, attributes interceptor.Attributes, err error) {
n, err = trackEncoding.rtxSrtpStream.Read(in)
return n, a, err
}),
)

r.api.interceptor.BindLocalStream(
&trackEncoding.rtxStreamInfo,
interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, _ interceptor.Attributes) (int, error) {
return rtxSrtpStream.WriteRTP(header, payload)
}),
)
}
}

close(r.sendCalled)
Expand Down Expand Up @@ -361,7 +405,16 @@ func (r *RTPSender) Stop() error {
errs := []error{}
for _, trackEncoding := range r.trackEncodings {
r.api.interceptor.UnbindLocalStream(&trackEncoding.streamInfo)

errs = append(errs, trackEncoding.srtpStream.Close())

if trackEncoding.srtpStream != nil {
errs = append(errs, trackEncoding.srtpStream.Close())
}
if trackEncoding.rtxSrtpStream != nil {
r.api.interceptor.UnbindLocalStream(&trackEncoding.rtxStreamInfo)
errs = append(errs, trackEncoding.rtxSrtpStream.Close())
}
}

return util.FlattenErrs(errs)
Expand Down Expand Up @@ -393,6 +446,36 @@ func (r *RTPSender) ReadRTCP() ([]rtcp.Packet, interceptor.Attributes, error) {
return pkts, attributes, nil
}

// ReadRtx reads incoming RTX Stream RTCP for this RTPSender
func (r *RTPSender) ReadRtx(b []byte) (n int, a interceptor.Attributes, err error) {
if r.trackEncodings[0].rtxRtcpInterceptor == nil {
return 0, nil, io.ErrNoProgress
}

select {
case <-r.sendCalled:
return r.trackEncodings[0].rtxRtcpInterceptor.Read(b, a)
case <-r.stopCalled:
return 0, nil, io.ErrClosedPipe
}
}

// ReadRtxRTCP is a convenience method that wraps ReadRtx and unmarshals for you.
func (r *RTPSender) ReadRtxRTCP() ([]rtcp.Packet, interceptor.Attributes, error) {
b := make([]byte, r.api.settingEngine.getReceiveMTU())
i, attributes, err := r.ReadRtx(b)
if err != nil {
return nil, nil, err
}

pkts, err := rtcp.Unmarshal(b[:i])
if err != nil {
return nil, nil, err
}

return pkts, attributes, nil
}

// ReadSimulcast reads incoming RTCP for this RTPSender for given rid
func (r *RTPSender) ReadSimulcast(b []byte, rid string) (n int, a interceptor.Attributes, err error) {
select {
Expand Down
6 changes: 6 additions & 0 deletions sdp.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,13 @@ func addSenderSDP(

sendParameters := sender.GetParameters()
for _, encoding := range sendParameters.Encodings {
if encoding.RTX.SSRC != 0 {
media = media.WithValueAttribute(sdp.AttrKeySSRCGroup, fmt.Sprintf("FID %d %d", encoding.SSRC, encoding.RTX.SSRC))
}
media = media.WithMediaSource(uint32(encoding.SSRC), track.StreamID() /* cname */, track.StreamID() /* streamLabel */, track.ID())
if encoding.RTX.SSRC != 0 {
media = media.WithMediaSource(uint32(encoding.RTX.SSRC), track.StreamID() /* cname */, track.StreamID() /* streamLabel */, track.ID())
}
if !isPlanB {
media = media.WithPropertyAttribute("msid:" + track.StreamID() + " " + track.ID())
}
Expand Down
6 changes: 6 additions & 0 deletions settingengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ type SettingEngine struct {
disableMediaEngineCopy bool
srtpProtectionProfiles []dtls.SRTPProtectionProfile
receiveMTU uint
trackLocalRtx bool
}

// getReceiveMTU returns the configured MTU. If SettingEngine's MTU is configured to 0 it returns the default
Expand Down Expand Up @@ -434,3 +435,8 @@ func (e *SettingEngine) SetDTLSKeyLogWriter(writer io.Writer) {
func (e *SettingEngine) SetSCTPMaxReceiveBufferSize(maxReceiveBufferSize uint32) {
e.sctp.maxReceiveBufferSize = maxReceiveBufferSize
}

// SetTrackLocalRtx allows track local use RTX.
func (e *SettingEngine) SetTrackLocalRtx(enable bool) {
e.trackLocalRtx = enable
}
5 changes: 3 additions & 2 deletions track_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ type TrackLocalContext interface {
}

type baseTrackLocalContext struct {
id string
params RTPParameters
id string
params RTPParameters

ssrc SSRC
writeStream TrackLocalWriter
rtcpInterceptor interceptor.RTCPReader
Expand Down

0 comments on commit 62e4e71

Please sign in to comment.