Skip to content

Commit

Permalink
Fix pending ICE candidates during migration
Browse files Browse the repository at this point in the history
  • Loading branch information
DanielNovak committed Oct 20, 2023
1 parent e208be9 commit db6012c
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
Expand Down Expand Up @@ -173,6 +174,7 @@ public class RtcSession internal constructor(
internal val trackIdToParticipant: MutableStateFlow<Map<String, String>> =
MutableStateFlow(emptyMap())
private var syncSubscriberAnswer: Job? = null
private var syncSubscriberCandidates: Job? = null
private var syncPublisherJob: Job? = null
private var subscriptionSyncJob: Job? = null
private var muteStateSyncJob: Job? = null
Expand Down Expand Up @@ -1061,7 +1063,6 @@ public class RtcSession internal constructor(
coroutineScope.launch {
logger.v { "[onRtcEvent] event: $event" }
when (event) {
is ICETrickleEvent -> handleIceTrickle(event)
is SubscriberOfferEvent -> handleSubscriberOffer(event)
// this dynascale event tells the SDK to change the quality of the video it's uploading
is ChangePublishQualityEvent -> updatePublishQuality(event)
Expand Down Expand Up @@ -1189,6 +1190,13 @@ public class RtcSession internal constructor(
)
subscriber.setRemoteDescription(offerDescription)

syncSubscriberCandidates?.cancel()
syncSubscriberCandidates = coroutineScope.launch {
sfuConnectionModule.sfuSocket.pendingSubscriberIceCandidates.consumeEach { iceCandidates ->
subscriber.addIceCandidate(iceCandidates)
}
}

// step 2 - create the answer
val answerResult = subscriber.createAnswer()
if (answerResult !is Success) {
Expand Down Expand Up @@ -1328,6 +1336,14 @@ public class RtcSession internal constructor(
SessionDescription.Type.ANSWER, result.getOrThrow().sdp,
),
)

// start listening to ICE candidates
launch {
sfuConnectionModule.sfuSocket.pendingPublisherIceCandidates.consumeEach { iceCandidates ->
publisher?.addIceCandidate(iceCandidates)
}
}

emit(result.getOrThrow())
}.flowOn(DispatcherProvider.IO).retryWhen { cause, attempt ->
val sameValue = mangledSdp == publisherSdpOffer.value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@
package io.getstream.video.android.core.call.connection

import io.getstream.log.taggedLogger
import io.getstream.result.Error
import io.getstream.result.Result
import io.getstream.result.Result.Failure
import io.getstream.video.android.core.call.stats.model.RtcStatsReport
import io.getstream.video.android.core.call.stats.toRtcStats
import io.getstream.video.android.core.call.utils.addRtcIceCandidate
Expand Down Expand Up @@ -113,12 +111,6 @@ public class StreamPeerConnection(
return state.value in goodStates
}

/**
* Used to pool together and store [IceCandidate]s before consuming them.
*/
private val pendingIceMutex = Mutex()
private val pendingIceCandidates = mutableListOf<IceCandidate>()

init {
logger.i { "<init> #sfu; #$typeTag; mediaConstraints: $mediaConstraints" }
}
Expand Down Expand Up @@ -182,15 +174,6 @@ public class StreamPeerConnection(
sessionDescription.description.mungeCodecs(),
),
)
}.also {
pendingIceMutex.withLock {
pendingIceCandidates.forEach { iceCandidate ->
val rtcIceCandidate = iceCandidate.toRtcCandidate()
logger.i { "[setRemoteDescription] #sfu; #subscriber; pendingRtcIceCandidate: $rtcIceCandidate" }
connection.addRtcIceCandidate(rtcIceCandidate)
}
pendingIceCandidates.clear()
}
}
}

Expand Down Expand Up @@ -228,13 +211,6 @@ public class StreamPeerConnection(
* @return An empty [Result], if the operation has been successful or not.
*/
public suspend fun addIceCandidate(iceCandidate: IceCandidate): Result<Unit> {
if (connection.remoteDescription == null) {
logger.w { "[addIceCandidate] #sfu; #$typeTag; postponed (no remoteDescription): $iceCandidate" }
pendingIceMutex.withLock {
pendingIceCandidates.add(iceCandidate)
}
return Failure(Error.GenericError(message = "RemoteDescription is not set"))
}
val rtcIceCandidate = iceCandidate.toRtcCandidate()
logger.d { "[addIceCandidate] #sfu; #$typeTag; rtcIceCandidate: $rtcIceCandidate" }
return connection.addRtcIceCandidate(rtcIceCandidate).also {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,19 @@ import android.os.Build
import io.getstream.log.taggedLogger
import io.getstream.video.android.core.BuildConfig
import io.getstream.video.android.core.call.signal.socket.RTCEventMapper
import io.getstream.video.android.core.call.utils.stringify
import io.getstream.video.android.core.dispatchers.DispatcherProvider
import io.getstream.video.android.core.events.ErrorEvent
import io.getstream.video.android.core.events.ICETrickleEvent
import io.getstream.video.android.core.events.JoinCallResponseEvent
import io.getstream.video.android.core.events.SfuSocketError
import io.getstream.video.android.core.internal.network.NetworkStateProvider
import io.getstream.video.android.core.model.IceCandidate
import kotlinx.coroutines.CancellableContinuation
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
import kotlinx.serialization.json.Json
import okhttp3.OkHttpClient
import okhttp3.WebSocket
import okio.ByteString
Expand All @@ -38,6 +43,7 @@ import stream.video.sfu.event.SfuRequest
import stream.video.sfu.models.ClientDetails
import stream.video.sfu.models.Device
import stream.video.sfu.models.OS
import stream.video.sfu.models.PeerType
import stream.video.sfu.models.Sdk
import stream.video.sfu.models.SdkType

Expand Down Expand Up @@ -73,6 +79,9 @@ public class SfuSocket(
// Only set during SFU migration
private var migrationData: (suspend () -> Migration)? = null

internal val pendingPublisherIceCandidates = Channel<IceCandidate>(capacity = 99)
internal val pendingSubscriberIceCandidates = Channel<IceCandidate>(capacity = 99)

private val clientDetails
get() = ClientDetails(
os = OS(
Expand Down Expand Up @@ -193,6 +202,8 @@ public class SfuSocket(
logger.d { "[onMessage] SFU socket connected" }
setConnectedStateAndContinue(message)
}
} else if (message is ICETrickleEvent) {
handleIceTrickle(message)
}
} catch (error: Throwable) {
logger.e { "[onMessage] failed: $error" }
Expand All @@ -201,6 +212,19 @@ public class SfuSocket(
}
}

private suspend fun handleIceTrickle(event: ICETrickleEvent) {
logger.d {
"[handleIceTrickle] #sfu; #${event.peerType.stringify()}; candidate: ${event.candidate}"
}
val iceCandidate: IceCandidate = Json.decodeFromString(event.candidate)
val result = if (event.peerType == PeerType.PEER_TYPE_PUBLISHER_UNSPECIFIED) {
pendingPublisherIceCandidates.send(iceCandidate)
} else {
pendingSubscriberIceCandidates.send(iceCandidate)
}
logger.v { "[handleTrickle] #sfu; #${event.peerType.stringify()}; result: $result" }
}

private fun handleFastReconnectNotPossible() {
val fastReconnectError = Exception("SFU fast-reconnect failed, full reconnect required")
disconnect(DisconnectReason.PermanentError(fastReconnectError))
Expand Down

0 comments on commit db6012c

Please sign in to comment.