Skip to content

Commit

Permalink
Feat: Xtream Account Info.
Browse files Browse the repository at this point in the history
  • Loading branch information
oxyroid committed Jun 1, 2024
1 parent d089cde commit bf29928
Show file tree
Hide file tree
Showing 10 changed files with 460 additions and 224 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ object Profiles {
val VIEWMODEL_PLAYLIST = Profile("viewmodel-playlist", Message.LEVEL_INFO)
val VIEWMODEL_SETTING = Profile("viewmodel-setting")
val VIEWMODEL_STREAM = Profile("viewmodel-stream")
val VIEWMODEL_PLAYLIST_CONFIGURATION = Profile("viewmodel-playlist-configuration")

val REPOS_PLAYLIST = Profile("repos-playlist")
val REPOS_STREAM = Profile("repos-stream")
Expand Down
59 changes: 59 additions & 0 deletions data/src/main/java/com/m3u/data/parser/ParserUtils.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package com.m3u.data.parser

import com.m3u.core.architecture.logger.Logger
import com.m3u.core.architecture.logger.execute
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.withContext
import kotlinx.serialization.ExperimentalSerializationApi
import kotlinx.serialization.json.Json
import kotlinx.serialization.json.decodeFromStream
import kotlinx.serialization.json.decodeToSequence
import okhttp3.OkHttpClient
import okhttp3.Request

class ParserUtils(
val json: Json,
val okHttpClient: OkHttpClient,
val logger: Logger,
val ioDispatcher: CoroutineDispatcher
) {
@OptIn(ExperimentalSerializationApi::class)
suspend inline fun <reified T> newCall(url: String): T? = withContext(ioDispatcher) {
logger.execute {
okHttpClient.newCall(
Request.Builder().url(url).build()
)
.execute()
.takeIf { it.isSuccessful }
?.body
?.byteStream()
?.let { json.decodeFromStream(it) }
}
}

@OptIn(ExperimentalSerializationApi::class)
suspend inline fun <reified T> newCallOrThrow(url: String): T =
withContext(ioDispatcher) {
okHttpClient.newCall(
Request.Builder().url(url).build()
)
.execute()
.takeIf { it.isSuccessful }!!
.body!!
.byteStream()
.let { json.decodeFromStream(it) }
}

@OptIn(ExperimentalSerializationApi::class)
inline fun <reified T> newSequenceCall(url: String): Sequence<T> =
logger.execute {
okHttpClient.newCall(
Request.Builder().url(url).build()
)
.execute()
.takeIf { it.isSuccessful }
?.body
?.byteStream()
?.let { json.decodeToSequence(it) }
} ?: sequence { }
}
36 changes: 18 additions & 18 deletions data/src/main/java/com/m3u/data/parser/xtream/XtreamInfo.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,20 @@ import kotlinx.serialization.Serializable
@Serializable
data class XtreamInfo(
@SerialName("server_info")
val serverInfo: ServerInfo,
val serverInfo: ServerInfo = ServerInfo(),
@SerialName("user_info")
val userInfo: UserInfo
val userInfo: UserInfo = UserInfo()
) {
@Serializable
data class ServerInfo(
@SerialName("https_port")
val httpsPort: String?,
val httpsPort: String? = null,
@SerialName("port")
val port: String?,
val port: String? = null,
// @SerialName("rtmp_port")
// val rtmpPort: String?,
@SerialName("server_protocol")
val serverProtocol: String?,
val serverProtocol: String? = null,
// @SerialName("time_now")
// val timeNow: String?,
// @SerialName("timestamp_now")
Expand All @@ -32,25 +32,25 @@ data class XtreamInfo(

@Serializable
data class UserInfo(
// @SerialName("active_cons")
// val activeCons: String?,
@SerialName("active_cons")
val activeCons: String? = null,
@SerialName("allowed_output_formats")
val allowedOutputFormats: List<String>,
val allowedOutputFormats: List<String> = emptyList(),
// @SerialName("auth")
// val auth: Int?,
// @SerialName("created_at")
// val createdAt: String?,
// @SerialName("is_trial")
// val isTrial: String?,
// @SerialName("max_connections")
// val maxConnections: String?,
@SerialName("created_at")
val createdAt: String? = null,
@SerialName("is_trial")
val isTrial: String? = null,
@SerialName("max_connections")
val maxConnections: String? = null,
// @SerialName("message")
// val message: String?,
// @SerialName("password")
// val password: String?,
// @SerialName("status")
// val status: String?,
// @SerialName("username")
// val username: String?
@SerialName("status")
val status: String? = null,
@SerialName("username")
val username: String? = null
)
}
2 changes: 2 additions & 0 deletions data/src/main/java/com/m3u/data/parser/xtream/XtreamParser.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ interface XtreamParser {

fun parse(input: XtreamInput): Flow<XtreamData>

suspend fun getInfo(input: XtreamInput): XtreamInfo

suspend fun getXtreamOutput(input: XtreamInput): XtreamOutput

companion object {
Expand Down
77 changes: 24 additions & 53 deletions data/src/main/java/com/m3u/data/parser/xtream/XtreamParserImpl.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,18 @@ import com.m3u.core.architecture.dispatcher.Dispatcher
import com.m3u.core.architecture.dispatcher.M3uDispatchers.IO
import com.m3u.core.architecture.logger.Logger
import com.m3u.core.architecture.logger.Profiles
import com.m3u.core.architecture.logger.execute
import com.m3u.core.architecture.logger.install
import com.m3u.data.api.OkhttpClient
import com.m3u.data.database.model.DataSource
import com.m3u.data.parser.ParserUtils
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.channelFlow
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import kotlinx.serialization.ExperimentalSerializationApi
import kotlinx.serialization.json.Json
import kotlinx.serialization.json.decodeFromStream
import kotlinx.serialization.json.decodeToSequence
import okhttp3.OkHttpClient
import okhttp3.Request
import java.time.Duration
import javax.inject.Inject

Expand All @@ -44,6 +40,21 @@ internal class XtreamParserImpl @Inject constructor(
.readTimeout(Duration.ofMillis(Int.MAX_VALUE.toLong()))
.build()

private val utils by lazy {
ParserUtils(
json = json,
okHttpClient = okHttpClient,
logger = logger,
ioDispatcher = ioDispatcher
)
}

override suspend fun getInfo(input: XtreamInput): XtreamInfo {
val (basicUrl, username, password, _) = input
val infoUrl = XtreamParser.createInfoUrl(basicUrl, username, password)
return checkNotNull(utils.newCall<XtreamInfo>(infoUrl))
}

override fun parse(input: XtreamInput): Flow<XtreamData> = channelFlow {
val (basicUrl, username, password, type) = input
val requiredLives = type == null || type == DataSource.Xtream.TYPE_LIVE
Expand All @@ -68,17 +79,17 @@ internal class XtreamParserImpl @Inject constructor(
XtreamParser.Action.GET_SERIES_STREAMS
)
if (requiredLives) launch {
newSequenceCall<XtreamLive>(liveStreamsUrl)
utils.newSequenceCall<XtreamLive>(liveStreamsUrl)
.asFlow()
.collect { live -> send(live) }
}
if (requiredVods) launch {
newSequenceCall<XtreamVod>(vodStreamsUrl)
utils.newSequenceCall<XtreamVod>(vodStreamsUrl)
.asFlow()
.collect { vod -> send(vod) }
}
if (requiredSeries) launch {
newSequenceCall<XtreamSerial>(seriesStreamsUrl)
utils.newSequenceCall<XtreamSerial>(seriesStreamsUrl)
.asFlow()
.collect { serial -> send(serial) }
}
Expand Down Expand Up @@ -108,18 +119,18 @@ internal class XtreamParserImpl @Inject constructor(
password,
XtreamParser.Action.GET_SERIES_CATEGORIES
)
val info: XtreamInfo = newCall(infoUrl) ?: return XtreamOutput()
val info: XtreamInfo = utils.newCall(infoUrl) ?: return XtreamOutput()
val allowedOutputFormats = info.userInfo.allowedOutputFormats
val serverProtocol = info.serverInfo.serverProtocol ?: "http"
val port = info.serverInfo.port?.toIntOrNull()
val httpsPort = info.serverInfo.httpsPort?.toIntOrNull()

val liveCategories: List<XtreamCategory> =
if (requiredLives) newCall(liveCategoriesUrl) ?: emptyList() else emptyList()
if (requiredLives) utils.newCall(liveCategoriesUrl) ?: emptyList() else emptyList()
val vodCategories: List<XtreamCategory> =
if (requiredVods) newCall(vodCategoriesUrl) ?: emptyList() else emptyList()
if (requiredVods) utils.newCall(vodCategoriesUrl) ?: emptyList() else emptyList()
val serialCategories: List<XtreamCategory> =
if (requiredSeries) newCall(serialCategoriesUrl) ?: emptyList() else emptyList()
if (requiredSeries) utils.newCall(serialCategoriesUrl) ?: emptyList() else emptyList()

return XtreamOutput(
liveCategories = liveCategories,
Expand All @@ -134,7 +145,7 @@ internal class XtreamParserImpl @Inject constructor(
override suspend fun getSeriesInfoOrThrow(input: XtreamInput, seriesId: Int): XtreamStreamInfo {
val (basicUrl, username, password, type) = input
check(type == DataSource.Xtream.TYPE_SERIES) { "xtream input type must be `series`" }
return newCallOrThrow(
return utils.newCallOrThrow(
XtreamParser.createActionUrl(
basicUrl,
username,
Expand All @@ -144,44 +155,4 @@ internal class XtreamParserImpl @Inject constructor(
)
)
}

@OptIn(ExperimentalSerializationApi::class)
private suspend inline fun <reified T> newCall(url: String): T? = withContext(ioDispatcher) {
logger.execute {
okHttpClient.newCall(
Request.Builder().url(url).build()
)
.execute()
.takeIf { it.isSuccessful }
?.body
?.byteStream()
?.let { json.decodeFromStream(it) }
}
}

@OptIn(ExperimentalSerializationApi::class)
private suspend inline fun <reified T> newCallOrThrow(url: String): T =
withContext(ioDispatcher) {
okHttpClient.newCall(
Request.Builder().url(url).build()
)
.execute()
.takeIf { it.isSuccessful }!!
.body!!
.byteStream()
.let { json.decodeFromStream(it) }
}

@OptIn(ExperimentalSerializationApi::class)
private inline fun <reified T> newSequenceCall(url: String): Sequence<T> =
logger.execute {
okHttpClient.newCall(
Request.Builder().url(url).build()
)
.execute()
.takeIf { it.isSuccessful }
?.body
?.byteStream()
?.let { json.decodeToSequence(it) }
} ?: sequence { }
}
Loading

0 comments on commit bf29928

Please sign in to comment.