diff --git a/mediaengine.go b/mediaengine.go index 7abb8c3a798..96dd3ddd628 100644 --- a/mediaengine.go +++ b/mediaengine.go @@ -47,6 +47,9 @@ const ( // MimeTypePCMA PCMA MIME type // Note: Matching should be case insensitive. MimeTypePCMA = "audio/PCMA" + // MimeTypeFlexFEC03 FlexFEC03 MIME type + // Note: Matching should be case insensitive. + MimeTypeFlexFEC03 = "video/flexfec-03" ) type mediaEngineHeaderExtension struct { diff --git a/rtpcodec.go b/rtpcodec.go index 5bbe8793721..d4a0e01d4c8 100644 --- a/rtpcodec.go +++ b/rtpcodec.go @@ -145,3 +145,13 @@ func codecParametersAssociatedSearch(needle RTPCodecParameters, haystack []RTPCo return RTPCodecParameters{}, codecMatchNone } + +// Do a search by mime type in the list of codecs +func codecParametersSearchByMimeType(mimeType string, haystack []RTPCodecParameters) (codecs []RTPCodecParameters) { + for i, c := range haystack { + if c.MimeType == mimeType { + codecs = append(codecs, haystack[i]) + } + } + return codecs +} diff --git a/rtpcodingparameters.go b/rtpcodingparameters.go index f03d8c35f45..d24d09129ca 100644 --- a/rtpcodingparameters.go +++ b/rtpcodingparameters.go @@ -9,6 +9,11 @@ type RTPRtxParameters struct { SSRC SSRC `json:"ssrc"` } +// RTPFecParameters dictionary contains information relating to FEC settings. +type RTPFecParameters struct { + SSRC SSRC `json:"ssrc"` +} + // RTPCodingParameters provides information relating to both encoding and decoding. // This is a subset of the RFC since Pion WebRTC doesn't implement encoding/decoding itself // http://draft.ortc.org/#dom-rtcrtpcodingparameters @@ -17,4 +22,5 @@ type RTPCodingParameters struct { SSRC SSRC `json:"ssrc"` PayloadType PayloadType `json:"payloadType"` RTX RTPRtxParameters `json:"rtx"` + FEC RTPFecParameters `json:"fec,omitempty"` } diff --git a/rtpsender.go b/rtpsender.go index e003574b747..71726067293 100644 --- a/rtpsender.go +++ b/rtpsender.go @@ -32,6 +32,11 @@ type trackEncoding struct { rtxSrtpStream *srtpWriterFuture rtxRtcpInterceptor interceptor.RTCPReader rtxStreamInfo interceptor.StreamInfo + + fecSsrc SSRC + fecSrtpStream *srtpWriterFuture + fecRtcpInterceptor interceptor.RTCPReader + fecStreamInfo interceptor.StreamInfo } // RTPSender allows an application to control how a given Track is encoded and transmitted to a remote peer @@ -125,6 +130,7 @@ func (r *RTPSender) getParameters() RTPSendParameters { SSRC: trackEncoding.ssrc, PayloadType: r.payloadType, RTX: RTPRtxParameters{SSRC: trackEncoding.rtxSsrc}, + FEC: RTPFecParameters{SSRC: trackEncoding.fecSsrc}, }, }) } @@ -223,6 +229,13 @@ func (r *RTPSender) addEncoding(track TrackLocal) { } } + if r.api.settingEngine.trackLocalFlexfec { + codecs := r.api.mediaEngine.getCodecsByKind(track.Kind()) + if len(codecParametersSearchByMimeType(MimeTypeFlexFEC03, codecs)) > 0 { + trackEncoding.fecSsrc = SSRC(randutil.NewMathRandomGenerator().Uint32()) + } + } + r.trackEncodings = append(r.trackEncodings, trackEncoding) } @@ -314,8 +327,14 @@ func (r *RTPSender) Send(parameters RTPSendParameters) error { return errRTPSenderTrackRemoved } - for idx, trackEncoding := range r.trackEncodings { + for idx := range r.trackEncodings { + trackEncoding := r.trackEncodings[idx] + srtpStream := &srtpWriterFuture{ssrc: parameters.Encodings[idx].SSRC, rtpSender: r} writeStream := &interceptorToTrackLocalWriter{} + fecCodecs := codecParametersSearchByMimeType(MimeTypeFlexFEC03, r.api.mediaEngine.getCodecsByKind(r.kind)) + + trackEncoding.srtpStream = srtpStream + trackEncoding.ssrc = parameters.Encodings[idx].SSRC trackEncoding.context = &baseTrackLocalContext{ id: r.id, params: r.api.mediaEngine.getRTPParametersByKind(trackEncoding.track.Kind(), []RTPTransceiverDirection{RTPTransceiverDirectionSendonly}), @@ -337,7 +356,18 @@ func (r *RTPSender) Send(parameters RTPSendParameters) error { codec.RTPCodecCapability, parameters.HeaderExtensions, ) - srtpStream := trackEncoding.srtpStream + + if len(fecCodecs) > 0 { + trackEncoding.streamInfo.Attributes.Set("flexfec-03", struct{}{}) + } + + trackEncoding.rtcpInterceptor = r.api.interceptor.BindRTCPReader( + interceptor.RTCPReaderFunc(func(in []byte, a interceptor.Attributes) (n int, _ interceptor.Attributes, err error) { + n, err = trackEncoding.srtpStream.Read(in) + return n, a, err + }), + ) + rtpInterceptor := r.api.interceptor.BindLocalStream( &trackEncoding.streamInfo, interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) { @@ -376,6 +406,37 @@ func (r *RTPSender) Send(parameters RTPSendParameters) error { }), ) } + + if len(fecCodecs) > 0 && + parameters.Encodings[idx].FEC.SSRC != 0 { + fecSrtpStream := &srtpWriterFuture{ssrc: parameters.Encodings[idx].FEC.SSRC, rtpSender: r} + + trackEncoding.fecSrtpStream = fecSrtpStream + trackEncoding.fecSsrc = parameters.Encodings[idx].FEC.SSRC + + trackEncoding.fecStreamInfo = *createStreamInfo( + r.id+"_fec", + parameters.Encodings[idx].FEC.SSRC, + fecCodecs[0].PayloadType, + fecCodecs[0].RTPCodecCapability, + parameters.HeaderExtensions, + ) + trackEncoding.fecStreamInfo.Attributes.Set("apt_ssrc", uint32(parameters.Encodings[idx].SSRC)) + + trackEncoding.fecRtcpInterceptor = r.api.interceptor.BindRTCPReader( + interceptor.RTCPReaderFunc(func(in []byte, a interceptor.Attributes) (n int, attributes interceptor.Attributes, err error) { + n, err = trackEncoding.fecSrtpStream.Read(in) + return n, a, err + }), + ) + + r.api.interceptor.BindLocalStream( + &trackEncoding.fecStreamInfo, + interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, _ interceptor.Attributes) (int, error) { + return fecSrtpStream.WriteRTP(header, payload) + }), + ) + } } close(r.sendCalled) @@ -415,6 +476,10 @@ func (r *RTPSender) Stop() error { r.api.interceptor.UnbindLocalStream(&trackEncoding.rtxStreamInfo) errs = append(errs, trackEncoding.rtxSrtpStream.Close()) } + if trackEncoding.fecSrtpStream != nil { + r.api.interceptor.UnbindLocalStream(&trackEncoding.fecStreamInfo) + errs = append(errs, trackEncoding.fecSrtpStream.Close()) + } } return util.FlattenErrs(errs) @@ -476,6 +541,36 @@ func (r *RTPSender) ReadRtxRTCP() ([]rtcp.Packet, interceptor.Attributes, error) return pkts, attributes, nil } +// ReadFec reads incoming FEC Stream RTCP for this RTPSender +func (r *RTPSender) ReadFec(b []byte) (n int, a interceptor.Attributes, err error) { + if r.trackEncodings[0].fecRtcpInterceptor == nil { + return 0, nil, io.ErrNoProgress + } + + select { + case <-r.sendCalled: + return r.trackEncodings[0].fecRtcpInterceptor.Read(b, a) + case <-r.stopCalled: + return 0, nil, io.ErrClosedPipe + } +} + +// ReadFecRTCP is a convenience method that wraps ReadFec and unmarshals for you. +func (r *RTPSender) ReadFecRTCP() ([]rtcp.Packet, interceptor.Attributes, error) { + b := make([]byte, r.api.settingEngine.getReceiveMTU()) + i, attributes, err := r.ReadFec(b) + if err != nil { + return nil, nil, err + } + + pkts, err := rtcp.Unmarshal(b[:i]) + if err != nil { + return nil, nil, err + } + + return pkts, attributes, nil +} + // ReadSimulcast reads incoming RTCP for this RTPSender for given rid func (r *RTPSender) ReadSimulcast(b []byte, rid string) (n int, a interceptor.Attributes, err error) { select { diff --git a/sdp.go b/sdp.go index f61407780a6..47e27eb656d 100644 --- a/sdp.go +++ b/sdp.go @@ -392,10 +392,16 @@ func addSenderSDP( if encoding.RTX.SSRC != 0 { media = media.WithValueAttribute(sdp.AttrKeySSRCGroup, fmt.Sprintf("FID %d %d", encoding.SSRC, encoding.RTX.SSRC)) } + if encoding.FEC.SSRC != 0 { + media = media.WithValueAttribute(sdp.AttrKeySSRCGroup, fmt.Sprintf("FEC-FR %d %d", encoding.SSRC, encoding.FEC.SSRC)) + } media = media.WithMediaSource(uint32(encoding.SSRC), track.StreamID() /* cname */, track.StreamID() /* streamLabel */, track.ID()) if encoding.RTX.SSRC != 0 { media = media.WithMediaSource(uint32(encoding.RTX.SSRC), track.StreamID() /* cname */, track.StreamID() /* streamLabel */, track.ID()) } + if encoding.FEC.SSRC != 0 { + media = media.WithMediaSource(uint32(encoding.FEC.SSRC), track.StreamID() /* cname */, track.StreamID() /* streamLabel */, track.ID()) + } if !isPlanB { media = media.WithPropertyAttribute("msid:" + track.StreamID() + " " + track.ID()) } diff --git a/settingengine.go b/settingengine.go index 7349ec52b3e..a2215f8dab7 100644 --- a/settingengine.go +++ b/settingengine.go @@ -92,6 +92,7 @@ type SettingEngine struct { srtpProtectionProfiles []dtls.SRTPProtectionProfile receiveMTU uint trackLocalRtx bool + trackLocalFlexfec bool } // getReceiveMTU returns the configured MTU. If SettingEngine's MTU is configured to 0 it returns the default @@ -440,3 +441,8 @@ func (e *SettingEngine) SetSCTPMaxReceiveBufferSize(maxReceiveBufferSize uint32) func (e *SettingEngine) SetTrackLocalRtx(enable bool) { e.trackLocalRtx = enable } + +// SetTrackLocalFlexfec allows track local use FlexFEC. +func (e *SettingEngine) SetTrackLocalFlexfec(enable bool) { + e.trackLocalFlexfec = enable +}