Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Livestream APIs for watchers #858

Merged
merged 4 commits into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ public final class io/getstream/video/android/core/CallState {
public final fun getIngress ()Lkotlinx/coroutines/flow/StateFlow;
public final fun getLive ()Lkotlinx/coroutines/flow/StateFlow;
public final fun getLiveDurationInMs ()Lkotlinx/coroutines/flow/StateFlow;
public final fun getLivestream ()Lkotlinx/coroutines/flow/StateFlow;
public final fun getLocalParticipant ()Lkotlinx/coroutines/flow/StateFlow;
public final fun getMe ()Lkotlinx/coroutines/flow/StateFlow;
public final fun getMember (Ljava/lang/String;)Lio/getstream/video/android/core/MemberState;
Expand Down Expand Up @@ -877,11 +878,9 @@ public final class io/getstream/video/android/core/call/RtcSession {
public final fun connect (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public final fun createPublisher ()Lio/getstream/video/android/core/call/connection/StreamPeerConnection;
public final fun createSubscriber ()Lio/getstream/video/android/core/call/connection/StreamPeerConnection;
public final fun getLocalTrack (Lstream/video/sfu/models/TrackType;)Lio/getstream/video/android/core/model/MediaTrack;
public final fun getPublisherStats (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public final fun getSubscriber ()Lio/getstream/video/android/core/call/connection/StreamPeerConnection;
public final fun getSubscriberStats (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public final fun getTrack (Ljava/lang/String;Lstream/video/sfu/models/TrackType;)Lio/getstream/video/android/core/model/MediaTrack;
public final fun getTrackDimensions ()Lkotlinx/coroutines/flow/MutableStateFlow;
public final fun getTrackDimensionsDebounced ()Lkotlinx/coroutines/flow/Flow;
public final fun getTracks ()Ljava/util/Map;
Expand All @@ -892,11 +891,8 @@ public final class io/getstream/video/android/core/call/RtcSession {
public final fun onNegotiationNeeded (Lio/getstream/video/android/core/call/connection/StreamPeerConnection;Lio/getstream/video/android/core/model/StreamPeerType;)V
public final fun reconnect (ZLkotlin/coroutines/Continuation;)Ljava/lang/Object;
public final fun requestSubscriberIceRestart (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public final fun setLocalTrack (Lstream/video/sfu/models/TrackType;Lio/getstream/video/android/core/model/MediaTrack;)V
public final fun setMuteState (ZLstream/video/sfu/models/TrackType;)V
public final fun setScreenShareTrack ()V
public final fun setSubscriber (Lio/getstream/video/android/core/call/connection/StreamPeerConnection;)V
public final fun setTrack (Ljava/lang/String;Lstream/video/sfu/models/TrackType;Lio/getstream/video/android/core/model/MediaTrack;)V
public final fun setTracks (Ljava/util/Map;)V
public final fun switchSfu (Ljava/lang/String;Ljava/lang/String;Ljava/util/List;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public final fun updateTrackDimensions (Ljava/lang/String;Lstream/video/sfu/models/TrackType;ZLstream/video/sfu/models/VideoDimension;)V
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -677,8 +677,8 @@ public class Call(
startTranscription: Boolean = false,
): Result<GoLiveResponse> {
val result = clientImpl.goLive(
type,
id,
type = type,
id = id,
startHls = startHls,
startRecording = startRecording,
startTranscription = startTranscription,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import kotlinx.coroutines.Job
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.currentCoroutineContext
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.StateFlow
Expand Down Expand Up @@ -136,15 +137,18 @@ public sealed interface RealtimeConnection {
/**
* True when the peer connections are ready
*/
public data object Connected : RealtimeConnection // connected to RTC, able to receive and send video
public data object Connected :
RealtimeConnection // connected to RTC, able to receive and send video

/**
* Reconnecting is true whenever Rtc isn't available and trying to recover
* If the subscriber peer connection breaks we'll reconnect
* If the publisher peer connection breaks we'll reconnect
* Also if the network provider from the OS says that internet is down we'll set it to reconnecting
*/
public data object Reconnecting : RealtimeConnection // reconnecting to recover from temporary issues
public data object Reconnecting :
RealtimeConnection // reconnecting to recover from temporary issues

public data class Failed(val error: Any) : RealtimeConnection // permanent failure
public data object Disconnected : RealtimeConnection // normal disconnect by the app
}
Expand Down Expand Up @@ -230,6 +234,40 @@ public class CallState(

val stats = CallStats(call, scope)

private val livestreamFlow: Flow<ParticipantState.Video?> = channelFlow {
fun emitLivestreamVideo() {
val participants = participants.value
val filteredVideo =
participants.mapNotNull { it.video.value }.firstOrNull { it.track != null }
scope.launch {
if (_backstage.value) {
send(null)
} else {
send(filteredVideo)
}
}
}

scope.launch {
_participants.collect {
emitLivestreamVideo()
}
}

// TODO: could optimize performance by subscribing only to relevant events
call.subscribe {
emitLivestreamVideo()
}

// emit livestream Video
emitLivestreamVideo()

awaitClose { }
}

val livestream: StateFlow<ParticipantState.Video?> = livestreamFlow.debounce(1000)
.stateIn(scope, SharingStarted.WhileSubscribed(10000L), null)

internal val sortedParticipantsFlow = channelFlow {
// uses a channel flow to handle concurrency and 3 things updating: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/channel-flow.html

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -764,14 +764,20 @@ internal class StreamVideoImpl internal constructor(
return wrapAPICall { connectionModule.api.endCall(type, id) }
}

suspend fun goLive(type: String, id: String, startHls: Boolean, startRecording: Boolean, startTranscription: Boolean): Result<GoLiveResponse> {
suspend fun goLive(
type: String,
id: String,
startHls: Boolean,
startRecording: Boolean,
startTranscription: Boolean,
): Result<GoLiveResponse> {
logger.d { "[goLive] callCid: $type:$id" }

return wrapAPICall {
connectionModule.api.goLive(
type,
id,
GoLiveRequest(
type = type,
id = id,
goLiveRequest = GoLiveRequest(
startHls = startHls,
startRecording = startRecording,
startTranscription = startTranscription,
Expand Down Expand Up @@ -846,6 +852,7 @@ internal class StreamVideoImpl internal constructor(
)
}
}

suspend fun startBroadcasting(type: String, id: String): Result<StartBroadcastingResponse> {
logger.d { "[startBroadcasting] callCid: $type $id" }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import io.getstream.log.taggedLogger
import io.getstream.result.Result
import io.getstream.result.Result.Failure
import io.getstream.result.Result.Success
import io.getstream.result.onSuccessSuspend
import io.getstream.video.android.core.Call
import io.getstream.video.android.core.DeviceStatus
import io.getstream.video.android.core.MediaManagerImpl
Expand Down Expand Up @@ -213,14 +214,14 @@ public class RtcSession internal constructor(
// It's cleaner to store here and have the participant state reference to it
var tracks: MutableMap<String, MutableMap<TrackType, MediaTrack>> = mutableMapOf()

fun getTrack(sessionId: String, type: TrackType): MediaTrack? {
private fun getTrack(sessionId: String, type: TrackType): MediaTrack? {
if (!tracks.containsKey(sessionId)) {
tracks[sessionId] = mutableMapOf()
}
return tracks[sessionId]?.get(type)
}

fun setTrack(sessionId: String, type: TrackType, track: MediaTrack) {
private fun setTrack(sessionId: String, type: TrackType, track: MediaTrack) {
if (!tracks.containsKey(sessionId)) {
tracks[sessionId] = mutableMapOf()
}
Expand Down Expand Up @@ -248,11 +249,11 @@ public class RtcSession internal constructor(
}
}

fun getLocalTrack(type: TrackType): MediaTrack? {
private fun getLocalTrack(type: TrackType): MediaTrack? {
return getTrack(sessionId, type)
}

fun setLocalTrack(type: TrackType, track: MediaTrack) {
private fun setLocalTrack(type: TrackType, track: MediaTrack) {
return setTrack(sessionId, type, track)
}

Expand Down Expand Up @@ -485,7 +486,7 @@ public class RtcSession internal constructor(
* Audio is available from the start.
* Video only becomes available after we update the subscription
*/
internal fun addStream(mediaStream: MediaStream) {
private fun addStream(mediaStream: MediaStream) {
val (trackPrefix, trackTypeString) = mediaStream.id.split(':')
val sessionId = trackPrefixToSessionIdMap.value[trackPrefix]

Expand Down Expand Up @@ -707,7 +708,7 @@ public class RtcSession internal constructor(
* -- error isn't permanent, SFU didn't change, the mute/publish state didn't change
* -- we cap at 30 retries to prevent endless loops
*/
fun setMuteState(isEnabled: Boolean, trackType: TrackType) {
private fun setMuteState(isEnabled: Boolean, trackType: TrackType) {
logger.d { "[setPublishState] #sfu; $trackType isEnabled: $isEnabled" }

// update the local copy
Expand All @@ -731,7 +732,7 @@ public class RtcSession internal constructor(
},
)
val result = updateMuteState(request)
emit(result.getOrThrow())
result.onSuccessSuspend { emit(result.getOrThrow()) }
}.flowOn(DispatcherProvider.IO).retryWhen { cause, attempt ->
val sameValue = new == muteState.value
val sameSfu = currentSfu == sfuUrl
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@

package io.getstream.video.android.tutorial.livestream

import android.util.Log
import android.widget.Toast
import androidx.compose.foundation.background
import androidx.compose.foundation.layout.Arrangement
import androidx.compose.foundation.layout.Box
import androidx.compose.foundation.layout.Column
import androidx.compose.foundation.layout.Row
import androidx.compose.foundation.layout.Spacer
import androidx.compose.foundation.layout.fillMaxSize
import androidx.compose.foundation.layout.fillMaxWidth
import androidx.compose.foundation.layout.padding
import androidx.compose.foundation.layout.width
import androidx.compose.foundation.shape.RoundedCornerShape
import androidx.compose.material.Text
import androidx.compose.runtime.Composable
Expand All @@ -38,6 +42,7 @@ import androidx.compose.ui.graphics.Color
import androidx.compose.ui.platform.LocalContext
import androidx.compose.ui.unit.dp
import io.getstream.video.android.compose.theme.VideoTheme
import io.getstream.video.android.compose.ui.components.video.VideoRenderer
import io.getstream.video.android.core.Call
import io.getstream.video.android.core.GEO
import io.getstream.video.android.core.StreamVideoBuilder
Expand All @@ -51,9 +56,9 @@ fun LiveAudience() {

LaunchedEffect(key1 = Unit) {
val userToken =
"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjoiQWRtaXJhbF9BY2tiYXIiLCJpc3MiOiJwcm9udG8iLCJzdWIiOiJ1c2VyL0FkbWlyYWxfQWNrYmFyIiwiaWF0IjoxNjkzNzk0NTc4LCJleHAiOjE2OTQzOTkzODN9.7uYF4xB1zUrQ1GIpsoICoU5G0DpXq_5_IDyohz6p3VU"
val userId = "Admiral_Ackbar"
val callId = "szua8Iy5iMX2"
"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjoiQmVuX1NreXdhbGtlciIsImlzcyI6InByb250byIsInN1YiI6InVzZXIvQmVuX1NreXdhbGtlciIsImlhdCI6MTY5Njk4NDE3MywiZXhwIjoxNjk3NTg4OTc4fQ.Cdq_sw1ZA_PiGNXmOIZdxZjmlBKK8DuW8Oy_YjKloZw"
val userId = "Ben_Skywalker"
val callId = "dE8AsD5Qxqrt"

// step1 - create a user.
val user = User(
Expand All @@ -65,7 +70,7 @@ fun LiveAudience() {
// step2 - initialize StreamVideo. For a production app we recommend adding the client to your Application class or di module.
val client = StreamVideoBuilder(
context = context,
apiKey = "mmhfdzb5evj2", // demo API key
apiKey = "hd8szvscpxvd", // demo API key
geo = GEO.GlobalEdgeNetwork,
user = user,
token = userToken,
Expand All @@ -75,7 +80,7 @@ fun LiveAudience() {
// step3 - join a call, which type is `default` and id is `123`.
call = client.call("livestream", callId)

// join the call
// join the cal
val result = call?.join()
result?.onError {
Toast.makeText(context, "uh oh $it", Toast.LENGTH_SHORT).show()
Expand All @@ -93,10 +98,7 @@ private fun LiveGuestContent(call: Call) {
val totalParticipants by call.state.totalParticipants.collectAsState()
val backstage by call.state.backstage.collectAsState()
val duration by call.state.duration.collectAsState()

LaunchedEffect(key1 = participants) {
Log.e("Test", "participants: $participants")
}
val livestream by call.state.livestream.collectAsState()

Column(
modifier = Modifier
Expand All @@ -116,23 +118,39 @@ private fun LiveGuestContent(call: Call) {
color = VideoTheme.colors.textHighEmphasis,
)
} else {
Text(
modifier = Modifier
.align(Alignment.CenterEnd)
.background(
color = VideoTheme.colors.primaryAccent,
shape = RoundedCornerShape(6.dp),
Column(modifier = Modifier.fillMaxSize()) {
Row(
modifier = Modifier.fillMaxWidth(),
horizontalArrangement = Arrangement.Center,
verticalAlignment = Alignment.CenterVertically,
) {
Text(
modifier = Modifier
.background(
color = VideoTheme.colors.primaryAccent,
shape = RoundedCornerShape(6.dp),
)
.padding(horizontal = 16.dp, vertical = 4.dp),
text = "Live $totalParticipants",
color = Color.White,
)
.padding(horizontal = 12.dp, vertical = 4.dp),
text = "Live $totalParticipants",
color = Color.White,
)

Text(
modifier = Modifier.align(Alignment.Center),
text = "Live for $duration",
color = VideoTheme.colors.textHighEmphasis,
)
Spacer(modifier = Modifier.width(12.dp))

Text(
text = "Live for $duration",
color = VideoTheme.colors.textHighEmphasis,
)
}

VideoRenderer(
modifier = Modifier
.fillMaxSize()
.padding(6.dp),
call = call,
video = livestream,
)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,15 @@ import androidx.compose.ui.draw.clip
import androidx.compose.ui.graphics.Color
import androidx.compose.ui.platform.LocalContext
import androidx.compose.ui.unit.dp
import io.getstream.log.Priority
import io.getstream.video.android.compose.permission.LaunchCallPermissions
import io.getstream.video.android.compose.theme.VideoTheme
import io.getstream.video.android.compose.ui.components.video.VideoRenderer
import io.getstream.video.android.core.Call
import io.getstream.video.android.core.GEO
import io.getstream.video.android.core.RealtimeConnection
import io.getstream.video.android.core.StreamVideoBuilder
import io.getstream.video.android.core.logging.LoggingLevel
import io.getstream.video.android.model.User
import kotlinx.coroutines.launch

Expand All @@ -59,25 +61,26 @@ fun LiveHost() {

LaunchedEffect(key1 = Unit) {
val userToken =
"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjoiQWRtaXJhbF9BY2tiYXIiLCJpc3MiOiJwcm9udG8iLCJzdWIiOiJ1c2VyL0FkbWlyYWxfQWNrYmFyIiwiaWF0IjoxNjkzNzk0NTc4LCJleHAiOjE2OTQzOTkzODN9.7uYF4xB1zUrQ1GIpsoICoU5G0DpXq_5_IDyohz6p3VU"
val userId = "Admiral_Ackbar"
val callId = "szua8Iy5iMX2"
"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjoiRGFydGhfS3JheXQiLCJpc3MiOiJwcm9udG8iLCJzdWIiOiJ1c2VyL0RhcnRoX0tyYXl0IiwiaWF0IjoxNjk2OTgzMjk1LCJleHAiOjE2OTc1ODgxMDB9.g5K76Vv5D-uCoBfAfDpI3pyQIpoFMx8J9Eus0VkHk-M"
val userId = "Darth_Krayt"
val callId = "dE8AsD5Qxqrt"

// step1 - create a user.
val user = User(
id = userId, // any string
name = "Tutorial", // name and image are used in the UI
role = "admin",
role = "guest",
)

// step2 - initialize StreamVideo. For a production app we recommend adding the client to your Application class or di module.
val client = StreamVideoBuilder(
context = context,
apiKey = "mmhfdzb5evj2", // demo API key
apiKey = "hd8szvscpxvd", // demo API key
geo = GEO.GlobalEdgeNetwork,
user = user,
token = userToken,
ensureSingleInstance = false,
loggingLevel = LoggingLevel(priority = Priority.VERBOSE),
).build()

// step3 - join a call, which type is `default` and id is `123`.
Expand Down
Loading
Loading