Skip to content

Commit

Permalink
feat: notification when refresh the programmes.
Browse files Browse the repository at this point in the history
feat: programme start time instant in playlist gallery channel item.
feat: time instant in programme gallery current time line.
  • Loading branch information
oxyroid committed May 4, 2024
1 parent c34ba3e commit 5651531
Show file tree
Hide file tree
Showing 15 changed files with 216 additions and 112 deletions.
7 changes: 7 additions & 0 deletions androidApp/src/main/java/com/m3u/androidApp/M3UApplication.kt
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,19 @@ class M3UApplication : Application(), Configuration.Provider {
@Inject
lateinit var preferences: Preferences

// private val coroutineScope = CoroutineScope(SupervisorJob())

override fun onCreate() {
super.onCreate()
Thread.setDefaultUncaughtExceptionHandler(handler)
if (!resources.configuration.isTelevision() && !preferences.alwaysTv) {
DLNACastManager.bindCastService(this@M3UApplication)
}

// ResponseBodies.WebPage
// .onEach {
// }
// .launchIn(coroutineScope)
}

override val workManagerConfiguration: Configuration by lazy {
Expand Down
13 changes: 12 additions & 1 deletion data/src/main/java/com/m3u/data/api/ApiModule.kt
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,18 @@ internal object ApiModule {
.addInterceptor { chain ->
val request = chain.request()
try {
chain.proceed(request)
chain.proceed(request).apply {
val body = body
if (body != null) {
val contentType = body.contentType()
val isWebPage = contentType != null &&
contentType.type == "text" &&
contentType.subtype == "html"
if (isWebPage) {
WebPageManager.push(body)
}
}
}
} catch (e: Exception) {
logger.log(e)
Response.Builder()
Expand Down
45 changes: 45 additions & 0 deletions data/src/main/java/com/m3u/data/api/WebPages.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.m3u.data.api

import androidx.lifecycle.Lifecycle
import androidx.lifecycle.flowWithLifecycle
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import okhttp3.ResponseBody

object WebPageManager {
private val source: MutableSharedFlow<ResponseBody> = MutableSharedFlow()
private val coroutineScope = CoroutineScope(SupervisorJob())
fun push(body: ResponseBody) {
source.tryEmit(body)
}

private val jobs = mutableListOf<Job>()

fun observe(block: (ResponseBody) -> Unit) {
jobs += source
.onEach { block(it) }
.launchIn(coroutineScope)
}

fun observe(lifecycle: Lifecycle, block: (ResponseBody) -> Unit) {
jobs += source
.onEach { block(it) }
.flowWithLifecycle(lifecycle)
.launchIn(coroutineScope)
}

fun removeAllObservers() {
val iterator = jobs.listIterator()
while (iterator.hasNext()) {
val job = iterator.next()
if (!job.isCompleted) {
job.cancel()
}
iterator.remove()
}
}
}
3 changes: 1 addition & 2 deletions data/src/main/java/com/m3u/data/parser/epg/EpgData.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import kotlinx.datetime.TimeZone
import kotlinx.datetime.toInstant
import kotlinx.datetime.toKotlinLocalDateTime
import java.time.LocalDateTime
import java.time.ZoneId
import java.time.format.DateTimeFormatter

@Immutable
Expand Down Expand Up @@ -42,7 +41,7 @@ data class EpgProgramme(

private val EPG_DATE_TIME_FORMATTER = DateTimeFormatter
.ofPattern("yyyyMMddHHmmss Z")
.withZone(ZoneId.of("GMT"))
// .withZone(ZoneId.of("GMT"))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ internal class PlaylistRepositoryImpl @Inject constructor(
}

DataSource.EPG -> {
SubscriptionWorker.epg(workManager, url, false)
SubscriptionWorker.epg(workManager, url, true)
}

DataSource.Xtream -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ interface ProgrammeRepository {
): Flow<ProgrammeRange>

val refreshingEpgUrls: StateFlow<List<String>>
suspend fun checkOrRefreshProgrammesOrThrow(
fun checkOrRefreshProgrammesOrThrow(
playlistUrl: String,
ignoreCache: Boolean
)
): Flow<Int>
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,19 @@ import com.m3u.core.architecture.dispatcher.M3uDispatchers.IO
import com.m3u.core.architecture.logger.Logger
import com.m3u.core.architecture.logger.Profiles
import com.m3u.core.architecture.logger.install
import com.m3u.core.architecture.logger.post
import com.m3u.data.api.OkhttpClient
import com.m3u.data.database.dao.PlaylistDao
import com.m3u.data.database.dao.ProgrammeDao
import com.m3u.data.database.model.DataSource
import com.m3u.data.database.model.Programme
import com.m3u.data.database.model.ProgrammeRange
import com.m3u.data.database.model.epgUrlsOrXtreamXmlUrl
import com.m3u.data.parser.epg.EpgParser
import com.m3u.data.parser.epg.EpgProgramme
import com.m3u.data.parser.epg.toProgramme
import com.m3u.data.parser.xtream.XtreamInput
import com.m3u.data.parser.xtream.XtreamParser
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.channelFlow
Expand Down Expand Up @@ -58,55 +56,48 @@ internal class ProgrammeRepositoryImpl @Inject constructor(
.observeProgrammeRange(epgUrls, channelId)
.filterNot { (start, _) -> start == 0L }

override suspend fun checkOrRefreshProgrammesOrThrow(
override fun checkOrRefreshProgrammesOrThrow(
playlistUrl: String,
ignoreCache: Boolean
) {
val playlist = playlistDao.getByUrl(playlistUrl) ?: return
when (playlist.source) {
DataSource.M3U -> checkOrRefreshProgrammesOrThrowImpl(playlist.epgUrls, ignoreCache)
DataSource.Xtream -> {
val input = XtreamInput.decodeFromPlaylistUrl(playlistUrl)
val epgUrl = XtreamParser.createXmlUrl(
basicUrl = input.basicUrl,
username = input.username,
password = input.password
)
checkOrRefreshProgrammesOrThrowXtreamImpl(
epgUrl = epgUrl,
ignoreCache = ignoreCache
)
}

else -> {}
): Flow<Int> = channelFlow {
val playlist = playlistDao.getByUrl(playlistUrl) ?: return@channelFlow
val epgUrls = playlist.epgUrlsOrXtreamXmlUrl()
val producer = checkOrRefreshProgrammesOrThrowImpl(
epgUrls = epgUrls,
ignoreCache = ignoreCache
)
var count = 0
producer.collect { programme ->
programmeDao.insertOrReplace(programme)
send(++count)
}
}

private suspend fun checkOrRefreshProgrammesOrThrowImpl(
private fun checkOrRefreshProgrammesOrThrowImpl(
epgUrls: List<String>,
ignoreCache: Boolean
) = coroutineScope {
): Flow<Programme> = channelFlow {
val now = Clock.System.now().toEpochMilliseconds()
// we call it job -s because we think deferred -s is sick.
val jobs = epgUrls.map { epgUrl ->
async {
if (epgUrl in refreshingEpgUrls.value) return@async
if (epgUrl in refreshingEpgUrls.value) run {
logger.post { "skipped! epgUrl is refreshing. [$epgUrl]" }
return@async
}
supervisorScope {
try {
refreshingEpgUrls.value += epgUrl
val cacheMaxEnd = programmeDao.getMaxEndByEpgUrl(epgUrl)
if (!ignoreCache && cacheMaxEnd != null && cacheMaxEnd > now) return@supervisorScope

val epgPlaylist = playlistDao.getByUrl(epgUrl) ?: return@supervisorScope
check(epgPlaylist.source == DataSource.EPG) {
"Playlist which be queried by epgUrl is not epg source but ${epgPlaylist.source}"
if (!ignoreCache && cacheMaxEnd != null && cacheMaxEnd > now) run {
logger.post { "skipped! exist validate programmes. [$epgUrl]" }
return@supervisorScope
}

programmeDao.cleanByEpgUrl(epgUrl)
downloadProgrammes(epgUrl)
.map { it.toProgramme(epgUrl) }
.collect { programme ->
programmeDao.insertOrReplace(programme)
}
.collect { send(it) }
} finally {
refreshingEpgUrls.value -= epgUrl
}
Expand All @@ -116,29 +107,6 @@ internal class ProgrammeRepositoryImpl @Inject constructor(
jobs.awaitAll()
}

private suspend fun checkOrRefreshProgrammesOrThrowXtreamImpl(
epgUrl: String,
ignoreCache: Boolean
): Unit = coroutineScope {
val now = Clock.System.now().toEpochMilliseconds()
try {
if (epgUrl in refreshingEpgUrls.value) return@coroutineScope
refreshingEpgUrls.value += epgUrl
val cacheMaxEnd = programmeDao.getMaxEndByEpgUrl(epgUrl)
if (!ignoreCache && cacheMaxEnd != null && cacheMaxEnd > now) return@coroutineScope

programmeDao.cleanByEpgUrl(epgUrl)

downloadProgrammes(epgUrl)
.map { it.toProgramme(epgUrl) }
.collect { programme ->
programmeDao.insertOrReplace(programme)
}
} finally {
refreshingEpgUrls.value -= epgUrl
}
}

private fun downloadProgrammes(epgUrl: String): Flow<EpgProgramme> = channelFlow {
val isGzip = epgUrl
.toHttpUrlOrNull()
Expand Down
31 changes: 23 additions & 8 deletions data/src/main/java/com/m3u/data/worker/SubscriptionWorker.kt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import androidx.work.workDataOf
import com.m3u.core.architecture.logger.Logger
import com.m3u.core.architecture.logger.Profiles
import com.m3u.core.architecture.logger.install
import com.m3u.core.architecture.logger.post
import com.m3u.data.R
import com.m3u.data.database.model.DataSource
import com.m3u.data.parser.xtream.XtreamInput
Expand All @@ -33,6 +32,8 @@ import dagger.assisted.AssistedInject
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.Job
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import java.util.concurrent.atomic.AtomicInteger

@HiltWorker
Expand Down Expand Up @@ -96,7 +97,7 @@ class SubscriptionWorker @AssistedInject constructor(
playlistRepository.m3uOrThrow(title, url) { count ->
total = count
val notification = createN10nBuilder()
.setContentText(findProgressContentText(count))
.setContentText(findChannelProgressContentText(count))
.setActions(cancelAction)
.setOngoing(true)
.build()
Expand All @@ -118,6 +119,14 @@ class SubscriptionWorker @AssistedInject constructor(
playlistUrl = playlistUrl,
ignoreCache = ignoreCache
)
.onEach { count ->
val notification = createN10nBuilder()
.setContentText(findProgrammeProgressContentText(count))
.setActions(cancelAction)
.build()
notificationManager.notify(notificationId, notification)
}
.launchIn(this)
Result.success()
} catch (e: Exception) {
createN10nBuilder()
Expand Down Expand Up @@ -151,13 +160,11 @@ class SubscriptionWorker @AssistedInject constructor(
) { count ->
total = count
val notification = createN10nBuilder()
.setContentText(findProgressContentText(count))
.setContentText(findChannelProgressContentText(count))
.setActions(cancelAction)
.build()
notificationManager.notify(notificationId, notification)
logger.post { "xtream callback" }
}
logger.post { "xtream suspend resumed" }
createN10nBuilder()
.setContentText(findCompleteContentText(total))
.buildThenNotify()
Expand Down Expand Up @@ -200,7 +207,12 @@ class SubscriptionWorker @AssistedInject constructor(
private fun createN10nBuilder(): Notification.Builder =
Notification.Builder(context, CHANNEL_ID)
.setSmallIcon(R.drawable.round_file_download_24)
.setContentTitle(title)
.setContentTitle(
when (dataSource) {
DataSource.EPG -> epgPlaylistUrl
else -> title
}
)

private fun findCancelActionTitle() =
context.getString(string.data_worker_subscription_action_cancel)
Expand All @@ -211,8 +223,11 @@ class SubscriptionWorker @AssistedInject constructor(
private fun findCompleteContentText(total: Int) =
context.getString(string.data_worker_subscription_content_completed, total)

private fun findProgressContentText(count: Int) =
context.getString(string.data_worker_subscription_content_progress, count)
private fun findChannelProgressContentText(count: Int) =
context.getString(string.data_worker_subscription_content_channel_progress, count)

private fun findProgrammeProgressContentText(count: Int) =
context.getString(string.data_worker_subscription_content_programme_progress, count)

private val cancelAction: Notification.Action by lazy {
Notification.Action.Builder(
Expand Down
Loading

0 comments on commit 5651531

Please sign in to comment.