Skip to content

Commit

Permalink
Use Flows for pending ICE candidates
Browse files Browse the repository at this point in the history
  • Loading branch information
DanielNovak committed Oct 23, 2023
1 parent 719b14e commit efc76fd
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
Expand Down Expand Up @@ -1192,7 +1191,7 @@ public class RtcSession internal constructor(

syncSubscriberCandidates?.cancel()
syncSubscriberCandidates = coroutineScope.launch {
sfuConnectionModule.sfuSocket.pendingSubscriberIceCandidates.consumeEach { iceCandidates ->
sfuConnectionModule.sfuSocket.pendingSubscriberIceCandidates.collect { iceCandidates ->
subscriber.addIceCandidate(iceCandidates)
}
}
Expand Down Expand Up @@ -1339,7 +1338,7 @@ public class RtcSession internal constructor(

// start listening to ICE candidates
launch {
sfuConnectionModule.sfuSocket.pendingPublisherIceCandidates.consumeEach { iceCandidates ->
sfuConnectionModule.sfuSocket.pendingPublisherIceCandidates.collect { iceCandidates ->
publisher?.addIceCandidate(iceCandidates)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import io.getstream.video.android.core.model.IceCandidate
import kotlinx.coroutines.CancellableContinuation
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.ensureActive
import kotlinx.coroutines.flow.receiveAsFlow
import kotlinx.coroutines.launch
import kotlinx.serialization.json.Json
import okhttp3.OkHttpClient
Expand Down Expand Up @@ -79,8 +81,11 @@ public class SfuSocket(
// Only set during SFU migration
private var migrationData: (suspend () -> Migration)? = null

internal val pendingPublisherIceCandidates = Channel<IceCandidate>(capacity = 99)
internal val pendingSubscriberIceCandidates = Channel<IceCandidate>(capacity = 99)
private val _pendingPublisherIceCandidates = Channel<IceCandidate>(capacity = 99)
internal val pendingPublisherIceCandidates = _pendingPublisherIceCandidates.receiveAsFlow()

private val _pendingSubscriberIceCandidates = Channel<IceCandidate>(capacity = 99)
internal val pendingSubscriberIceCandidates = _pendingSubscriberIceCandidates.receiveAsFlow()

private val clientDetails
get() = ClientDetails(
Expand Down Expand Up @@ -206,6 +211,7 @@ public class SfuSocket(
handleIceTrickle(message)
}
} catch (error: Throwable) {
coroutineContext.ensureActive()
logger.e { "[onMessage] failed: $error" }
handleError(error)
}
Expand All @@ -218,9 +224,9 @@ public class SfuSocket(
}
val iceCandidate: IceCandidate = Json.decodeFromString(event.candidate)
val result = if (event.peerType == PeerType.PEER_TYPE_PUBLISHER_UNSPECIFIED) {
pendingPublisherIceCandidates.send(iceCandidate)
_pendingPublisherIceCandidates.send(iceCandidate)
} else {
pendingSubscriberIceCandidates.send(iceCandidate)
_pendingSubscriberIceCandidates.send(iceCandidate)
}
logger.v { "[handleTrickle] #sfu; #${event.peerType.stringify()}; result: $result" }
}
Expand Down

0 comments on commit efc76fd

Please sign in to comment.