Skip to content

Commit

Permalink
SFU Migration support
Browse files Browse the repository at this point in the history
  • Loading branch information
DanielNovak committed Oct 18, 2023
1 parent 463b9bf commit 5f81a2e
Show file tree
Hide file tree
Showing 15 changed files with 3,364 additions and 98 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,11 @@ Video roadmap and changelog is available [here](https://github.com/GetStream/pro
- [X] Android SDK development.md cleanup (Jaewoong)
- [X] Upgrade to more recent versions of webrtc (Jaewoong/Kanat)
- [X] Review foreground service vs backend for audio rooms etc. (Aleks)
- [X] Enable SFU switching
- [ ] Logging is too verbose (rtc is very noisy), clean it up to focus on the essential for info and higher (Daniel)

### 0.5.0 milestone

- [ ] Enable SFU switching
- [ ] H264 workaround on Samsung 23? (see https://github.com/livekit/client-sdk-android/blob/main/livekit-android-sdk/src/main/java/io/livekit/android/webrtc/SimulcastVideoEncoderFactoryWrapper.kt#L34 and
- https://github.com/react-native-webrtc/react-native-webrtc/issues/983#issuecomment-975624906)
- [ ] Test coverage
Expand Down
1,244 changes: 1,239 additions & 5 deletions stream-video-android-core/api/stream-video-android-core.api

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions stream-video-android-core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ android {
testInstrumentationRunner = "androidx.test.runner.AndroidJUnitRunner"
consumerProguardFiles("consumer-proguard-rules.pro")
buildConfigField("String", "STREAM_VIDEO_VERSION", "\"${Configuration.versionName}\"")
buildConfigField("Integer", "STREAM_VIDEO_VERSION_MAJOR", "${Configuration.majorVersion}")
buildConfigField("Integer", "STREAM_VIDEO_VERSION_MINOR", "${Configuration.minorVersion}")
buildConfigField("Integer", "STREAM_VIDEO_VERSION_PATCH", "${Configuration.patchVersion}")

}

buildFeatures {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ class ReconnectTest : IntegrationTestBase(connectCoordinatorWS = false) {

// the socket and rtc connection disconnect...,
// or ice candidate don't arrive due to temporary network failure
call.session?.reconnect()
call.session?.reconnect(forceRestart = true)

// leave and clean up a call
call.leave()
Expand Down Expand Up @@ -172,7 +172,7 @@ class ReconnectTest : IntegrationTestBase(connectCoordinatorWS = false) {
// connect to the new socket
// do an ice restart
call.session?.let {
it.switchSfu(it.sfuUrl, it.sfuToken, it.remoteIceServers)
it.switchSfu(it.sfuUrl, it.sfuToken, it.sfuToken, it.remoteIceServers)
}

// assert the publisher is still connected
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import io.getstream.video.android.core.call.audio.AudioFilter
import io.getstream.video.android.core.call.utils.SoundInputProcessor
import io.getstream.video.android.core.call.video.VideoFilter
import io.getstream.video.android.core.call.video.YuvFrame
import io.getstream.video.android.core.events.GoAwayEvent
import io.getstream.video.android.core.events.VideoEventListener
import io.getstream.video.android.core.internal.InternalStreamVideoApi
import io.getstream.video.android.core.model.MuteUsersData
Expand Down Expand Up @@ -377,25 +378,26 @@ public class Call(
val iceServers = result.value.credentials.iceServers.map { it.toIceServer() }
timer.split("join request completed")

if (session == null) {
session = if (testInstanceProvider.rtcSessionCreator != null) {
testInstanceProvider.rtcSessionCreator!!.invoke()
} else {
RtcSession(
client = client,
call = this,
sfuUrl = sfuUrl,
sfuToken = sfuToken,
connectionModule = (client as StreamVideoImpl).connectionModule,
remoteIceServers = iceServers,
)
}
session = if (testInstanceProvider.rtcSessionCreator != null) {
testInstanceProvider.rtcSessionCreator!!.invoke()
} else {
RtcSession(
client = client,
call = this,
sfuUrl = sfuUrl,
sfuToken = sfuToken,
connectionModule = (client as StreamVideoImpl).connectionModule,
remoteIceServers = iceServers,
onMigrationCompleted = {
state._connection.value = RealtimeConnection.Connected
monitor.check()
},
)
}

session?.let {
state._connection.value = RealtimeConnection.Joined(it)
}

timer.split("rtc session init")

session?.connect()
Expand Down Expand Up @@ -438,7 +440,7 @@ public class Call(
// listen to Signal WS
scope.launch {
session?.let {
it.sfuConnectionModule.sfuSocket.connectionState.collect { sfuSocketState ->
it.sfuSocketState.collect { sfuSocketState ->
if (sfuSocketState is SocketState.DisconnectedPermanently) {
handleSignalChannelDisconnect(isRetry = false)
}
Expand All @@ -455,6 +457,11 @@ public class Call(
// Prevent multiple starts of the reconnect flow. For the start call
// first check if sfuSocketReconnectionTime isn't already set - if yes
// then we are already doing a full reconnect
if (state._connection.value == RealtimeConnection.Migrating) {
logger.d { "Skipping disconnected channel event - we are migrating" }
return
}

if (!isRetry && sfuSocketReconnectionTime != null) {
logger.d { "[handleSignalChannelDisconnect] Already doing a full reconnect cycle - ignoring call" }
return
Expand Down Expand Up @@ -499,22 +506,35 @@ public class Call(
return clientImpl.sendStats(type, id, data)
}

suspend fun switchSfu(forceSwitch: Boolean = false) {
suspend fun switchSfu() {
state._connection.value = RealtimeConnection.Migrating

location?.let {
val joinResponse = joinRequest(location = it, currentSfu = session?.sfuUrl)
val shouldSwitch = false
val joinResponse = joinRequest(location = it, migratingFrom = session?.sfuUrl)

if ((shouldSwitch || forceSwitch) && joinResponse is Success) {
if (joinResponse is Success) {
// switch to the new SFU
val cred = joinResponse.value.credentials
logger.i { "Switching SFU from ${session?.sfuUrl} to ${cred.server.url}" }
val iceServers = cred.iceServers.map { it.toIceServer() }
session?.switchSfu(cred.server.url, cred.token, iceServers)

session?.switchSfu(cred.server.edgeName, cred.server.url, cred.token, iceServers, failedToSwitch = {
logger.e {
"[switchSfu] Failed to connect to new SFU during migration. Reverting to full reconnect"
}
state._connection.value = RealtimeConnection.Reconnecting
})
} else {
logger.e {
"[switchSfu] Failed to get a join response during " +
"migration - falling back to reconnect. Error ${joinResponse.errorOrNull()}"
}
state._connection.value = RealtimeConnection.Reconnecting
}
}
}

suspend fun reconnectOrSwitchSfu(forceRestart: Boolean) {
suspend fun reconnect(forceRestart: Boolean) {
// mark us as reconnecting
val connectionState = state._connection.value

Expand All @@ -528,10 +548,6 @@ public class Call(
if (online) {
// start by restarting ice connections
session?.reconnect(forceRestart = forceRestart)

// ask the coordinator if we should switch
// TODO: disabled since switching SFUs isn't 100% stable yet server side
// switchSfu()
}
}

Expand Down Expand Up @@ -615,6 +631,22 @@ public class Call(
session?.updateTrackDimensions(sessionId, trackType, visible)
}

fun handleEvent(event: VideoEvent) {
logger.i { "[call handleEvent] #sfu; event: $event" }

when (event) {
is GoAwayEvent ->
scope.launch {
handleSessionMigrationEvent()
}
}
}

private suspend fun handleSessionMigrationEvent() {
logger.d { "[handleSessionMigrationEvent] Received goAway event - starting migration" }
switchSfu()
}

// TODO: review this
/**
* Perhaps it would be nicer to have an interface. Any UI elements that renders video should implement it
Expand Down Expand Up @@ -891,7 +923,7 @@ public class Call(
internal suspend fun joinRequest(
create: CreateCallOptions? = null,
location: String,
currentSfu: String? = null,
migratingFrom: String? = null,
ring: Boolean = false,
notify: Boolean = false,
): Result<JoinCallResponse> {
Expand All @@ -906,6 +938,7 @@ public class Call(
ring = ring,
notify = notify,
location = location,
migratingFrom = migratingFrom,
)
result.onSuccess {
state.updateFromResponse(it)
Expand Down Expand Up @@ -987,7 +1020,7 @@ public class Call(

public fun switchSfu() {
call.scope.launch {
call.switchSfu(true)
call.switchSfu()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,12 @@ public class CallHealthMonitor(
*/
@Synchronized
fun check() {
// skip health checks if we are migrating
if (call.state._connection.value == RealtimeConnection.Migrating) {
logger.d { "Skipping health-check - we are migrating" }
return
}

val subscriberState = call.session?.subscriber?.state?.value
val publisherState = call.session?.publisher?.state?.value
val healthyPeerConnections = subscriberState in goodStates && publisherState in goodStates
Expand Down Expand Up @@ -154,6 +160,11 @@ public class CallHealthMonitor(
return
}

if (call.state._connection.value == RealtimeConnection.Migrating) {
logger.d { "[reconnect] Skipping reconnect - already migrating" }
return
}

logger.i { "attempting to reconnect" }

reconnectInProgress = true
Expand All @@ -175,7 +186,7 @@ public class CallHealthMonitor(
logger.d { "[reconnect] skipping reconnect - too often" }
} else {
lastReconnectAt = now
call.reconnectOrSwitchSfu(forceRestart = forceRestart)
call.reconnect(forceRestart = forceRestart)
}

reconnectInProgress = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ public sealed interface RealtimeConnection {
*/
public data object Reconnecting :
RealtimeConnection // reconnecting to recover from temporary issues

public data object Migrating : RealtimeConnection
public data class Failed(val error: Any) : RealtimeConnection // permanent failure
public data object Disconnected : RealtimeConnection // normal disconnect by the app
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,7 @@ internal class StreamVideoImpl internal constructor(
calls[selectedCid]?.let {
it.state.handleEvent(event)
it.session?.handleEvent(event)
it.handleEvent(event)
}
}

Expand Down Expand Up @@ -641,6 +642,7 @@ internal class StreamVideoImpl internal constructor(
ring: Boolean = false,
notify: Boolean = false,
location: String,
migratingFrom: String?,
): Result<JoinCallResponse> {
val joinCallRequest = JoinCallRequest(
create = create,
Expand All @@ -654,6 +656,7 @@ internal class StreamVideoImpl internal constructor(
ring = ring,
notify = notify,
location = location,
migratingFrom = migratingFrom,
)

val result = wrapAPICall {
Expand Down
Loading

0 comments on commit 5f81a2e

Please sign in to comment.