Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SFU Migration support #872

Merged
merged 3 commits into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,11 @@ Video roadmap and changelog is available [here](https://github.com/GetStream/pro
- [X] Android SDK development.md cleanup (Jaewoong)
- [X] Upgrade to more recent versions of webrtc (Jaewoong/Kanat)
- [X] Review foreground service vs backend for audio rooms etc. (Aleks)
- [X] Enable SFU switching
- [ ] Logging is too verbose (rtc is very noisy), clean it up to focus on the essential for info and higher (Daniel)

### 0.5.0 milestone

- [ ] Enable SFU switching
- [ ] H264 workaround on Samsung 23? (see https://github.com/livekit/client-sdk-android/blob/main/livekit-android-sdk/src/main/java/io/livekit/android/webrtc/SimulcastVideoEncoderFactoryWrapper.kt#L34 and
- https://github.com/react-native-webrtc/react-native-webrtc/issues/983#issuecomment-975624906)
- [ ] Test coverage
Expand Down
1,244 changes: 1,239 additions & 5 deletions stream-video-android-core/api/stream-video-android-core.api

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions stream-video-android-core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ android {
testInstrumentationRunner = "androidx.test.runner.AndroidJUnitRunner"
consumerProguardFiles("consumer-proguard-rules.pro")
buildConfigField("String", "STREAM_VIDEO_VERSION", "\"${Configuration.versionName}\"")
buildConfigField("Integer", "STREAM_VIDEO_VERSION_MAJOR", "${Configuration.majorVersion}")
buildConfigField("Integer", "STREAM_VIDEO_VERSION_MINOR", "${Configuration.minorVersion}")
buildConfigField("Integer", "STREAM_VIDEO_VERSION_PATCH", "${Configuration.patchVersion}")

}

buildFeatures {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ class ReconnectTest : IntegrationTestBase(connectCoordinatorWS = false) {

// the socket and rtc connection disconnect...,
// or ice candidate don't arrive due to temporary network failure
call.session?.reconnect()
call.session?.reconnect(forceRestart = true)

// leave and clean up a call
call.leave()
Expand Down Expand Up @@ -172,7 +172,7 @@ class ReconnectTest : IntegrationTestBase(connectCoordinatorWS = false) {
// connect to the new socket
// do an ice restart
call.session?.let {
it.switchSfu(it.sfuUrl, it.sfuToken, it.remoteIceServers)
it.switchSfu(it.sfuUrl, it.sfuToken, it.sfuToken, it.remoteIceServers)
}

// assert the publisher is still connected
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import io.getstream.video.android.core.call.audio.AudioFilter
import io.getstream.video.android.core.call.utils.SoundInputProcessor
import io.getstream.video.android.core.call.video.VideoFilter
import io.getstream.video.android.core.call.video.YuvFrame
import io.getstream.video.android.core.events.GoAwayEvent
import io.getstream.video.android.core.events.VideoEventListener
import io.getstream.video.android.core.internal.InternalStreamVideoApi
import io.getstream.video.android.core.model.MuteUsersData
Expand Down Expand Up @@ -377,25 +378,26 @@ public class Call(
val iceServers = result.value.credentials.iceServers.map { it.toIceServer() }
timer.split("join request completed")

if (session == null) {
session = if (testInstanceProvider.rtcSessionCreator != null) {
testInstanceProvider.rtcSessionCreator!!.invoke()
} else {
RtcSession(
client = client,
call = this,
sfuUrl = sfuUrl,
sfuToken = sfuToken,
connectionModule = (client as StreamVideoImpl).connectionModule,
remoteIceServers = iceServers,
)
}
session = if (testInstanceProvider.rtcSessionCreator != null) {
testInstanceProvider.rtcSessionCreator!!.invoke()
} else {
RtcSession(
client = client,
call = this,
sfuUrl = sfuUrl,
sfuToken = sfuToken,
connectionModule = (client as StreamVideoImpl).connectionModule,
remoteIceServers = iceServers,
onMigrationCompleted = {
state._connection.value = RealtimeConnection.Connected
monitor.check()
},
)
}

session?.let {
state._connection.value = RealtimeConnection.Joined(it)
}

timer.split("rtc session init")

session?.connect()
Expand Down Expand Up @@ -438,7 +440,7 @@ public class Call(
// listen to Signal WS
scope.launch {
session?.let {
it.sfuConnectionModule.sfuSocket.connectionState.collect { sfuSocketState ->
it.sfuSocketState.collect { sfuSocketState ->
if (sfuSocketState is SocketState.DisconnectedPermanently) {
handleSignalChannelDisconnect(isRetry = false)
}
Expand All @@ -455,6 +457,11 @@ public class Call(
// Prevent multiple starts of the reconnect flow. For the start call
// first check if sfuSocketReconnectionTime isn't already set - if yes
// then we are already doing a full reconnect
if (state._connection.value == RealtimeConnection.Migrating) {
logger.d { "Skipping disconnected channel event - we are migrating" }
return
}

if (!isRetry && sfuSocketReconnectionTime != null) {
logger.d { "[handleSignalChannelDisconnect] Already doing a full reconnect cycle - ignoring call" }
return
Expand Down Expand Up @@ -499,22 +506,35 @@ public class Call(
return clientImpl.sendStats(type, id, data)
}

suspend fun switchSfu(forceSwitch: Boolean = false) {
suspend fun switchSfu() {
state._connection.value = RealtimeConnection.Migrating

location?.let {
val joinResponse = joinRequest(location = it, currentSfu = session?.sfuUrl)
val shouldSwitch = false
val joinResponse = joinRequest(location = it, migratingFrom = session?.sfuUrl)

if ((shouldSwitch || forceSwitch) && joinResponse is Success) {
if (joinResponse is Success) {
// switch to the new SFU
val cred = joinResponse.value.credentials
logger.i { "Switching SFU from ${session?.sfuUrl} to ${cred.server.url}" }
val iceServers = cred.iceServers.map { it.toIceServer() }
session?.switchSfu(cred.server.url, cred.token, iceServers)

session?.switchSfu(cred.server.edgeName, cred.server.url, cred.token, iceServers, failedToSwitch = {
logger.e {
"[switchSfu] Failed to connect to new SFU during migration. Reverting to full reconnect"
}
state._connection.value = RealtimeConnection.Reconnecting
})
} else {
logger.e {
"[switchSfu] Failed to get a join response during " +
"migration - falling back to reconnect. Error ${joinResponse.errorOrNull()}"
}
state._connection.value = RealtimeConnection.Reconnecting
}
}
}

suspend fun reconnectOrSwitchSfu(forceRestart: Boolean) {
suspend fun reconnect(forceRestart: Boolean) {
// mark us as reconnecting
val connectionState = state._connection.value

Expand All @@ -528,10 +548,6 @@ public class Call(
if (online) {
// start by restarting ice connections
session?.reconnect(forceRestart = forceRestart)

// ask the coordinator if we should switch
// TODO: disabled since switching SFUs isn't 100% stable yet server side
// switchSfu()
}
}

Expand Down Expand Up @@ -615,6 +631,22 @@ public class Call(
session?.updateTrackDimensions(sessionId, trackType, visible)
}

fun handleEvent(event: VideoEvent) {
logger.i { "[call handleEvent] #sfu; event: $event" }

when (event) {
is GoAwayEvent ->
scope.launch {
handleSessionMigrationEvent()
}
}
}

private suspend fun handleSessionMigrationEvent() {
logger.d { "[handleSessionMigrationEvent] Received goAway event - starting migration" }
switchSfu()
}

// TODO: review this
/**
* Perhaps it would be nicer to have an interface. Any UI elements that renders video should implement it
Expand Down Expand Up @@ -891,7 +923,7 @@ public class Call(
internal suspend fun joinRequest(
create: CreateCallOptions? = null,
location: String,
currentSfu: String? = null,
migratingFrom: String? = null,
ring: Boolean = false,
notify: Boolean = false,
): Result<JoinCallResponse> {
Expand All @@ -906,6 +938,7 @@ public class Call(
ring = ring,
notify = notify,
location = location,
migratingFrom = migratingFrom,
)
result.onSuccess {
state.updateFromResponse(it)
Expand Down Expand Up @@ -987,7 +1020,7 @@ public class Call(

public fun switchSfu() {
call.scope.launch {
call.switchSfu(true)
call.switchSfu()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,12 @@ public class CallHealthMonitor(
*/
@Synchronized
fun check() {
// skip health checks if we are migrating
if (call.state._connection.value == RealtimeConnection.Migrating) {
logger.d { "Skipping health-check - we are migrating" }
return
}

val subscriberState = call.session?.subscriber?.state?.value
val publisherState = call.session?.publisher?.state?.value
val healthyPeerConnections = subscriberState in goodStates && publisherState in goodStates
Expand Down Expand Up @@ -154,6 +160,11 @@ public class CallHealthMonitor(
return
}

if (call.state._connection.value == RealtimeConnection.Migrating) {
logger.d { "[reconnect] Skipping reconnect - already migrating" }
return
}

logger.i { "attempting to reconnect" }

reconnectInProgress = true
Expand All @@ -175,7 +186,7 @@ public class CallHealthMonitor(
logger.d { "[reconnect] skipping reconnect - too often" }
} else {
lastReconnectAt = now
call.reconnectOrSwitchSfu(forceRestart = forceRestart)
call.reconnect(forceRestart = forceRestart)
}

reconnectInProgress = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ public sealed interface RealtimeConnection {
*/
public data object Reconnecting :
RealtimeConnection // reconnecting to recover from temporary issues

public data object Migrating : RealtimeConnection
public data class Failed(val error: Any) : RealtimeConnection // permanent failure
public data object Disconnected : RealtimeConnection // normal disconnect by the app
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,7 @@ internal class StreamVideoImpl internal constructor(
calls[selectedCid]?.let {
it.state.handleEvent(event)
it.session?.handleEvent(event)
it.handleEvent(event)
}
}

Expand Down Expand Up @@ -641,6 +642,7 @@ internal class StreamVideoImpl internal constructor(
ring: Boolean = false,
notify: Boolean = false,
location: String,
migratingFrom: String?,
): Result<JoinCallResponse> {
val joinCallRequest = JoinCallRequest(
create = create,
Expand All @@ -654,6 +656,7 @@ internal class StreamVideoImpl internal constructor(
ring = ring,
notify = notify,
location = location,
migratingFrom = migratingFrom,
)

val result = wrapAPICall {
Expand Down
Loading
Loading