Skip to content

Commit

Permalink
Merge pull request #198 from dolbyio-samples/feature/rts-viewer-2.0
Browse files Browse the repository at this point in the history
Integrate 2.0.0 SDK into RTS Viewer tvOS
  • Loading branch information
aravind-raveendran authored Jun 27, 2024
2 parents 3349193 + fe3ab02 commit 3ec0671
Show file tree
Hide file tree
Showing 248 changed files with 1,648 additions and 5,460 deletions.
Binary file added .DS_Store
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,14 @@ public final class RendererRegistry {
return renderer
}
}

public func acceleratedRenderer(for source: StreamSource) -> MCVideoSwiftUIView.Renderer {
if let renderer = renderers[source.sourceId] {
return renderer
} else {
let renderer: MCVideoSwiftUIView.Renderer = .accelerated(MCAcceleratedVideoRenderer())
renderers[source.sourceId] = renderer
return renderer
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,14 @@ public actor SubscriptionManager: ObservableObject {
category: String(describing: SubscriptionManager.self)
)

public enum SubscriptionError: Error, Equatable {
case signalingError(reason: String)
case connectError(status: NSNumber, reason: String)
public struct ConnectionError: Error, Equatable {
public let status: NSNumber
public let reason: String
}

public enum State: Equatable {
case subscribed(sources: [StreamSource])
case error(SubscriptionError)
case stopped
case error(ConnectionError)
case disconnected
}

Expand Down Expand Up @@ -140,35 +139,37 @@ extension SubscriptionManager {

let streamStoppedStateObservation = Task {
for await state in subscriber.streamStopped() {
guard !Task.isCancelled else { return }
Self.logger.debug("👨‍🔧 Stream stopped \(state.description)")
updateState(to: .stopped)
}
}

let taskHttpErrorStateObservation = Task {
for await state in subscriber.httpError() {
guard !Task.isCancelled else { return }
Self.logger.debug("👨‍🔧 Http error state changed to \(state.code), reason: \(state.reason)")
updateState(to: .error(.connectError(status: state.code, reason: state.reason)))
updateState(to: .error(ConnectionError(status: state.code, reason: state.reason)))
}
}

let taskSignalingErrorStateObservation = Task {
for await state in subscriber.signalingError() {
guard !Task.isCancelled else { return }
Self.logger.debug("👨‍🔧 Signalling error state: reason - \(state.reason)")
updateState(to: .error(.signalingError(reason: state.reason)))
}
}

let tracksObservation = Task {
for await track in subscriber.rtsRemoteTrackAdded() {
guard !Task.isCancelled else { return }
Self.logger.debug("👨‍🔧 Remote track added - \(track.sourceID)")
sourceBuilder.addTrack(track)
}
}

let statsObservation = Task {
for await statsReport in subscriber.statsReport() {
guard let stats = StreamStatistics(statsReport) else {
guard !Task.isCancelled, let stats = StreamStatistics(statsReport) else {
return
}
updateStats(stats)
Expand All @@ -177,6 +178,7 @@ extension SubscriptionManager {

let sourcesObservation = Task {
for await sources in sourceBuilder.sourceStream {
guard !Task.isCancelled else { return }
Self.logger.debug("👨‍🔧 Sources builder emitted \(sources)")
updateState(to: .subscribed(sources: sources))
}
Expand Down
149 changes: 63 additions & 86 deletions LocalPackages/RTSCore/Sources/RTSCore/Models/SourceBuilder.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,34 +14,16 @@ private class PartialSource {
didSet {
if let videoTrack {
source = StreamSource(sourceId: sourceId, videoTrack: videoTrack, audioTrack: audioTrack)
// Track is active when its first received
isVideoActive = true
source?.setAudioActive(isAudioActive)
}
}
}
var audioTrack: MCRTSRemoteAudioTrack? {
didSet {
if let audioTrack {
source?.addAudioTrack(audioTrack)
// Track is active when its first received
isAudioActive = true
}
}
}

var isVideoActive: Bool = false {
didSet {
source?.setVideoActive(isVideoActive)
}
}

var isAudioActive: Bool = false {
didSet {
source?.setAudioActive(isAudioActive)
}
}

private(set) var source: StreamSource?

init(sourceId: SourceID) {
Expand All @@ -59,17 +41,12 @@ final class SourceBuilder {
private var audioTrackActivityObservationDictionary: [SourceID: Task<Void, Never>] = [:]
private var videoTrackActivityObservationDictionary: [SourceID: Task<Void, Never>] = [:]

// FIXME: `MCRTSRemoteTrack.isActive` returns true even after receiving an inactive event.
// This prevents the application from filtering a list of active sources(video tracks) from the sources(video tracks) list
// The workaround is to maintain the list locally
private var inactiveVideoSources: Set<SourceID> = []
private var inactiveAudioSources: Set<SourceID> = []
private let audioTrackStateUpdateSubject: PassthroughSubject<(SourceID, Bool), Never> = PassthroughSubject()
private let videoTrackStateUpdateSubject: PassthroughSubject<(SourceID, Bool), Never> = PassthroughSubject()
private let audioTrackStateUpdateSubject: PassthroughSubject<SourceID, Never> = PassthroughSubject()
private let videoTrackStateUpdateSubject: PassthroughSubject<SourceID, Never> = PassthroughSubject()

private var sources: [StreamSource] = [] {
didSet {
sources.forEach { observeTrackEvents(for: $0) }
Self.logger.debug("👨‍🔧 Sources updated, \(self.sources)")
sourceStreamContinuation.yield(sources)
}
}
Expand All @@ -89,24 +66,22 @@ final class SourceBuilder {
self.sourceStream = stream

audioTrackStateUpdateSubject
.sink { [weak self] sourceId, active in
.sink { [weak self] sourceId in
guard let self, let source = self.partialSources.first(where: { $0.sourceId == sourceId }) else {
return
}
Self.logger.debug("👨‍🔧 Handle audio track active state change \(sourceId), \(active)")
source.isAudioActive = active
self.sources = self.makeSources()
Self.logger.debug("👨‍🔧 Handle audio track active state change \(sourceId); isActive \(source.audioTrack?.isActive == true)")
self.sourceStreamContinuation.yield(self.sources)
}
.store(in: &subscriptions)

videoTrackStateUpdateSubject
.sink { [weak self] sourceId, active in
.sink { [weak self] sourceId in
guard let self, let source = self.partialSources.first(where: { $0.sourceId == sourceId }) else {
return
}
Self.logger.debug("👨‍🔧 Handle video track active state change \(sourceId), \(active)")
source.isVideoActive = active
self.sources = self.makeSources()
Self.logger.debug("👨‍🔧 Handle video track active state change \(sourceId); isActive \(source.videoTrack?.isActive == true)")
self.sourceStreamContinuation.yield(self.sources)
}
.store(in: &subscriptions)
}
Expand All @@ -117,21 +92,34 @@ final class SourceBuilder {
let partialSource = partialSources.first(where: { $0.sourceId == sourceID }) ?? PartialSource(sourceId: sourceID)
if let audioTrack = track.asAudio() {
partialSource.audioTrack = audioTrack
observeAudioTrackEvents(for: audioTrack, sourceId: sourceID)
} else if let videoTrack = track.asVideo() {
partialSource.videoTrack = videoTrack
observeVideoTrackEvents(for: videoTrack, sourceId: sourceID)
}

if partialSources.firstIndex(where: { $0.sourceId == sourceID }) == nil {
partialSources.append(partialSource)
}
sources = makeSources()
guard let newSource = partialSource.source else {
return
}

if !sources.contains(where: { $0.sourceId == newSource.sourceId }) {
sources.append(newSource)
} else if let index = sources.firstIndex(where: { $0.sourceId == newSource.sourceId }) {
var updatesSources = sources
updatesSources.remove(at: index)
updatesSources.insert(newSource, at: index)
sources = updatesSources
}
}

func reset() {
Self.logger.debug("👨‍🔧 Reset source builder")

partialSources.removeAll()
subscriptions.removeAll()
inactiveVideoSources = []
inactiveAudioSources = []

audioTrackActivityObservationDictionary.forEach { (sourceId, _) in
audioTrackActivityObservationDictionary[sourceId]?.cancel()
Expand All @@ -147,63 +135,52 @@ final class SourceBuilder {

private extension SourceBuilder {

// swiftlint:disable cyclomatic_complexity function_body_length
func observeTrackEvents(for source: StreamSource) {
func observeAudioTrackEvents(for track: MCRTSRemoteAudioTrack, sourceId: SourceID) {
Task { [weak self] in
guard let self else { return }

var tasks: [Task<Void, Never>] = []
if let audioTrack = source.audioTrack, self.videoTrackActivityObservationDictionary[source.sourceId] == nil {
Self.logger.debug("👨‍🔧 Registering for audio track lifecycle events of \(source.sourceId)")
let audioTrackActivityObservation = Task {
for await activityEvent in audioTrack.activity() {
switch activityEvent {
case .active:
Self.logger.debug("👨‍🔧 Audio track for \(source.sourceId) is active")
self.inactiveAudioSources.remove(source.sourceId)
self.audioTrackStateUpdateSubject.send((source.sourceId, true))

case .inactive:
Self.logger.debug("👨‍🔧 Audio track for \(source.sourceId) is inactive")
self.inactiveAudioSources.insert(source.sourceId)
self.audioTrackStateUpdateSubject.send((source.sourceId, false))
}
guard let self, self.audioTrackActivityObservationDictionary[sourceId] == nil else {
return
}
Self.logger.debug("👨‍🔧 Registering for audio track lifecycle events of \(sourceId)")
let audioTrackActivityObservation = Task {
for await activityEvent in track.activity() {
guard !Task.isCancelled else { return }
switch activityEvent {
case .active:
Self.logger.debug("👨‍🔧 Audio track for \(sourceId) is active, \(track.isActive)")
self.audioTrackStateUpdateSubject.send(sourceId)

case .inactive:
Self.logger.debug("👨‍🔧 Audio track for \(sourceId) is inactive, \(track.isActive)")
self.audioTrackStateUpdateSubject.send(sourceId)
}
}
self.audioTrackActivityObservationDictionary[source.sourceId] = audioTrackActivityObservation
tasks.append(audioTrackActivityObservation)
}
self.audioTrackActivityObservationDictionary[sourceId] = audioTrackActivityObservation
await audioTrackActivityObservation.value
}
}

if self.videoTrackActivityObservationDictionary[source.sourceId] == nil {
Self.logger.debug("👨‍🔧 Registering for video track lifecycle events of \(source.sourceId)")
let videoTrackActivityObservation = Task {
for await activityEvent in source.videoTrack.activity() {
switch activityEvent {
case .active:
Self.logger.debug("👨‍🔧 Video track for \(source.sourceId) is active")
self.inactiveVideoSources.remove(source.sourceId)
self.videoTrackStateUpdateSubject.send((source.sourceId, true))

case .inactive:
Self.logger.debug("👨‍🔧 Video track for \(source.sourceId) is inactive")
self.inactiveVideoSources.insert(source.sourceId)
self.videoTrackStateUpdateSubject.send((source.sourceId, false))
}
func observeVideoTrackEvents(for track: MCRTSRemoteVideoTrack, sourceId: SourceID) {
Task { [weak self] in
guard let self, self.videoTrackActivityObservationDictionary[sourceId] == nil else { return }

Self.logger.debug("👨‍🔧 Registering for video track lifecycle events of \(sourceId)")
let videoTrackActivityObservation = Task {
for await activityEvent in track.activity() {
guard !Task.isCancelled else { return }
switch activityEvent {
case .active:
Self.logger.debug("👨‍🔧 Video track for \(sourceId) is active, \(track.isActive)")
self.videoTrackStateUpdateSubject.send(sourceId)

case .inactive:
Self.logger.debug("👨‍🔧 Video track for \(sourceId) is inactive, \(track.isActive)")
self.videoTrackStateUpdateSubject.send(sourceId)
}
}
self.videoTrackActivityObservationDictionary[source.sourceId] = videoTrackActivityObservation
tasks.append(videoTrackActivityObservation)
}

await withTaskGroup(of: Void.self) { group in
for task in tasks {
group.addTask { await task.value }
}
}
self.videoTrackActivityObservationDictionary[sourceId] = videoTrackActivityObservation
await videoTrackActivityObservation.value
}
}

func makeSources() -> [StreamSource] {
partialSources.compactMap { $0.source }
}
}
14 changes: 0 additions & 14 deletions LocalPackages/RTSCore/Sources/RTSCore/Models/StreamSource.swift
Original file line number Diff line number Diff line change
Expand Up @@ -45,29 +45,15 @@ public struct StreamSource: Identifiable {
public let videoTrack: MCRTSRemoteVideoTrack
public private(set) var audioTrack: MCRTSRemoteAudioTrack?

// FIXME: `MCRTSRemoteTrack.isActive` returns `true` even after receiving an inactive event.
// Remove the below isVideoActive & isAudioActive properties once the issue is fixed
public private(set) var isVideoActive: Bool
public private(set) var isAudioActive: Bool = false

init(sourceId: SourceID, videoTrack: MCRTSRemoteVideoTrack, audioTrack: MCRTSRemoteAudioTrack? = nil) {
self.sourceId = sourceId
self.videoTrack = videoTrack
self.audioTrack = audioTrack
self.isVideoActive = true
}

mutating func addAudioTrack(_ track: MCRTSRemoteAudioTrack) {
audioTrack = track
}

mutating func setVideoActive(_ active: Bool) {
isVideoActive = active
}

mutating func setAudioActive(_ active: Bool) {
isAudioActive = active
}
}

extension StreamSource: Comparable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ public struct StreamStatistics: Equatable, Hashable {
}

public struct StatsInboundRtp: Equatable, Hashable {
public let kind: String
public let sid: String
public let mid: String
public let kind: String?
public let sid: String?
public let mid: String?
public let decoder: String?
public let processingDelay: Double
public let decodeTime: Double
Expand Down Expand Up @@ -81,9 +81,9 @@ extension StreamStatistics {

extension StatsInboundRtp {
init(_ stats: MCInboundRtpStreamStats, codecStatsList: [MCCodecsStats]?) {
kind = stats.kind as String
sid = stats.sid as String
mid = stats.mid as String
kind = stats.kind
sid = stats.sid
mid = stats.mid
frameWidth = Int(stats.frame_width)
frameHeight = Int(stats.frame_height)
fps = Int(stats.frames_per_second)
Expand Down Expand Up @@ -116,15 +116,15 @@ extension StatsInboundRtp {
)
nackCount = Int(stats.nack_count)
packetsLost = Int(stats.packets_lost)
decoder = stats.decoder_implementation as String?
decoder = stats.decoder_implementation
audioLevel = Int(stats.audio_level)
totalEnergy = stats.total_audio_energy
totalSampleDuration = stats.total_samples_duration
codec = stats.codec_id as String?
codec = stats.codec_id
timestamp = Double(stats.timestamp)

if let codecStats = codecStatsList?.first(where: { $0.sid == stats.codec_id }) {
codecName = codecStats.mime_type as String
codecName = codecStats.mime_type
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ final class CoreDataManager {
}
container.loadPersistentStores(completionHandler: { _, error in
if let error = error {
fatalError("Failed loading persistent stores with error: \(error.localizedDescription)")
// fatalError("Failed loading persistent stores with error: \(error.localizedDescription)")
print("$$$ \("Failed loading persistent stores with error: \(error.localizedDescription)")")
}
})
}
Expand Down
Binary file added rts-viewer-tvos/.DS_Store
Binary file not shown.
Loading

0 comments on commit 3ec0671

Please sign in to comment.