diff --git a/app/controllers/RelayRound.scala b/app/controllers/RelayRound.scala index 45a72efe4251..051783c452cf 100644 --- a/app/controllers/RelayRound.scala +++ b/app/controllers/RelayRound.scala @@ -227,11 +227,7 @@ final class RelayRound( }(Unauthorized, Forbidden) def stats(id: RelayRoundId) = Open: - env.relay.stats - .get(id) - .map: stats => - import lila.relay.JsonView.given - JsonOk(stats) + env.relay.statsJson(id).map(JsonOk) private def WithRoundAndTour( @nowarn ts: String, @@ -274,7 +270,7 @@ final class RelayRound( case VideoEmbed.Auto => fuccess: rt.tour.pinnedStream - .ifFalse(rt.round.finished) + .ifFalse(rt.round.isFinished) .flatMap(_.upstream) .map(_.urls(netDomain).toPair) case VideoEmbed.No => fuccess(none) diff --git a/bin/mongodb/relay-round-finishedAt.js b/bin/mongodb/relay-round-finishedAt.js new file mode 100644 index 000000000000..064580b66663 --- /dev/null +++ b/bin/mongodb/relay-round-finishedAt.js @@ -0,0 +1,6 @@ +db.relay.find({ finished: true, finishedAt: { $exists: false } }).forEach(function(relay) { + const startAt = relay.startedAt || relay.startsAt || relay.createdAt; + const duration = 1000 * 60 * 60 * 3; // 3 hours + const finishAt = new Date(startAt.getTime() + duration); + db.relay.updateOne({ _id: relay._id }, { $set: { finishedAt: finishAt } }); +}); diff --git a/modules/common/src/main/mon.scala b/modules/common/src/main/mon.scala index 91f2b7585d96..c0826581fa73 100644 --- a/modules/common/src/main/mon.scala +++ b/modules/common/src/main/mon.scala @@ -283,6 +283,7 @@ object mon: private def relay(official: Boolean, id: RelayTourId, slug: String) = tags("by" -> by(official), "slug" -> s"$slug/$id") def ongoing(official: Boolean) = gauge("relay.ongoing").withTag("by", by(official)) + val crowdMonitor = gauge("relay.crowdMonitor").withoutTags() def games(official: Boolean, id: RelayTourId, slug: String) = gauge("relay.games").withTags(relay(official, id, slug)) def moves(official: Boolean, id: RelayTourId, slug: String) = diff --git a/modules/db/src/main/dsl.scala b/modules/db/src/main/dsl.scala index 96f0b71c5ecc..01cb17b18c02 100644 --- a/modules/db/src/main/dsl.scala +++ b/modules/db/src/main/dsl.scala @@ -105,6 +105,12 @@ trait dsl: "$unset" -> fields.nonEmpty.so($doc(fields.map(k => (k, BSONString(""))))) def $unset(field: String, fields: String*): Bdoc = $doc: "$unset" -> $doc((Seq(field) ++ fields).map(k => (k, BSONString("")))) + + def $unsetCompute[A](prev: A, next: A, fields: (String, A => Option[?])*): Bdoc = + $unset: + fields.flatMap: (key, accessor) => + (accessor(prev).isDefined && accessor(next).isEmpty).option(key) + def $setBoolOrUnset(field: String, value: Boolean): Bdoc = if value then $set(field -> true) else $unset(field) def $setsAndUnsets(items: (String, Option[BSONValue])*): Bdoc = diff --git a/modules/relay/src/main/Env.scala b/modules/relay/src/main/Env.scala index ab2ab24bc00e..eb5370eca845 100644 --- a/modules/relay/src/main/Env.scala +++ b/modules/relay/src/main/Env.scala @@ -65,8 +65,6 @@ final class Env( lazy val listing: RelayListing = wire[RelayListing] - lazy val stats = wire[RelayStatsApi] - lazy val api: RelayApi = wire[RelayApi] lazy val tourStream: RelayTourStream = wire[RelayTourStream] @@ -101,6 +99,10 @@ final class Env( private lazy val delay = wire[RelayDelay] + // eager init to start the scheduler + private val stats = wire[RelayStatsApi] + export stats.{ getJson as statsJson } + import SettingStore.CredentialsOption.given val proxyCredentials = settingStore[Option[Credentials]]( "relayProxyCredentials", diff --git a/modules/relay/src/main/JsonView.scala b/modules/relay/src/main/JsonView.scala index 42c97b8a96cb..d10e63382bb2 100644 --- a/modules/relay/src/main/JsonView.scala +++ b/modules/relay/src/main/JsonView.scala @@ -166,14 +166,15 @@ object JsonView: "slug" -> r.slug, "createdAt" -> r.createdAt ) - .add("finished" -> r.finished) - .add("ongoing" -> (r.hasStarted && !r.finished)) + .add("finishedAt" -> r.finishedAt) + .add("finished" -> r.isFinished) // BC + .add("ongoing" -> (r.hasStarted && !r.isFinished)) .add("startsAt" -> r.startsAtTime.orElse(r.startedAt)) .add("startsAfterPrevious" -> r.startsAfterPrevious) - given OWrites[RelayStats.RoundStats] = OWrites: r => + def statsJson(stats: RelayStats.RoundStats) = Json.obj( - "viewers" -> r.viewers.map: (minute, crowd) => + "viewers" -> stats.viewers.map: (minute, crowd) => Json.arr(minute * 60, crowd) ) diff --git a/modules/relay/src/main/RelayApi.scala b/modules/relay/src/main/RelayApi.scala index 8da8983051cf..2a12f42e3dd4 100644 --- a/modules/relay/src/main/RelayApi.scala +++ b/modules/relay/src/main/RelayApi.scala @@ -316,12 +316,12 @@ final class RelayApi( round <- copyRoundSourceSettings(updated) _ <- (from.name != round.name).so(studyApi.rename(round.studyId, round.name.into(StudyName))) setters <- tryBdoc(round).toEither.toFuture - unsetters = (from.caption.isDefined && updated.caption.isEmpty).option("caption").toList - _ <- roundRepo.coll.update.one($id(round.id), $set(setters) ++ $unset(unsetters)).void + unsets = $unsetCompute(from, updated, ("caption", _.caption), ("finishedAt", _.finishedAt)) + _ <- roundRepo.coll.update.one($id(round.id), $set(setters) ++ unsets).void _ <- (round.sync.playing != from.sync.playing) .so(sendToContributors(round.id, "relaySync", jsonView.sync(round))) _ <- denormalizeTour(round.tourId) - nextRoundToStart <- round.finished.so(nextRoundThatStartsAfterThisOneCompletes(round)) + nextRoundToStart <- round.isFinished.so(nextRoundThatStartsAfterThisOneCompletes(round)) _ <- nextRoundToStart.so(next => requestPlay(next.id, v = true)) yield round.sync.log.events.lastOption diff --git a/modules/relay/src/main/RelayFetch.scala b/modules/relay/src/main/RelayFetch.scala index be54491fe0c0..c3be1a1c90a7 100644 --- a/modules/relay/src/main/RelayFetch.scala +++ b/modules/relay/src/main/RelayFetch.scala @@ -109,9 +109,9 @@ final private class RelayFetch( nbGamesFinished > nbGamesUnstarted noMoreGamesSelected = games.isEmpty && allGamesInSource.nonEmpty autoFinishNow = rt.round.hasStarted && (allGamesFinishedOrUnstarted || noMoreGamesSelected) - roundUpdate = updating: - _.withSync(_.addLog(SyncLog.event(res.nbMoves, none))) - .copy(finished = autoFinishNow) + roundUpdate = updating: r => + r.withSync(_.addLog(SyncLog.event(res.nbMoves, none))) + .copy(finishedAt = r.finishedAt.orElse(autoFinishNow.option(nowInstant))) yield res -> roundUpdate syncFu .recover: diff --git a/modules/relay/src/main/RelayListing.scala b/modules/relay/src/main/RelayListing.scala index cb3b1a34dd1c..0fd6f41545b4 100644 --- a/modules/relay/src/main/RelayListing.scala +++ b/modules/relay/src/main/RelayListing.scala @@ -147,7 +147,7 @@ final class RelayListing( yield spotlightCache = active .filter(_.tour.spotlight.exists(_.enabled)) - .filterNot(_.display.finished) + .filterNot(_.display.isFinished) .filter: tr => tr.display.hasStarted || tr.display.startsAtTime.exists(_.isBefore(nowInstant.plusMinutes(30))) active @@ -256,7 +256,7 @@ private object RelayListing: .match case None => trs.rounds.headOption case Some(last) => - trs.rounds.find(!_.finished) match + trs.rounds.find(!_.isFinished) match case None => last.some case Some(next) => if next.startsAtTime.exists(_.isBefore(nowInstant.plusHours(1))) diff --git a/modules/relay/src/main/RelayPush.scala b/modules/relay/src/main/RelayPush.scala index abfb750329e8..4cd479727f3c 100644 --- a/modules/relay/src/main/RelayPush.scala +++ b/modules/relay/src/main/RelayPush.scala @@ -13,7 +13,6 @@ import lila.study.{ ChapterPreviewApi, MultiPgn, StudyPgnImport } final class RelayPush( sync: RelaySync, api: RelayApi, - stats: RelayStatsApi, chapterPreview: ChapterPreviewApi, fidePlayers: RelayFidePlayerApi, playerEnrich: RelayPlayerEnrich, @@ -61,13 +60,13 @@ final class RelayPush( case e: Exception => SyncLog.event(0, e.some) _ = if !rt.round.hasStarted && !rt.tour.official && event.hasMoves then irc.broadcastStart(rt.round.id, rt.fullName) - _ = stats.setActive(rt.round.id) allGamesFinished <- (games.nonEmpty && games.forall(_.points.isDefined)).so: chapterPreview.dataList(rt.round.studyId).map(_.forall(_.finished)) round <- api.update(rt.round): r1 => - val r2 = r1.withSync(_.addLog(event)) - val r3 = if event.hasMoves then r2.ensureStarted.resume(rt.tour.official) else r2 - r3.copy(finished = allGamesFinished) + val r2 = r1.withSync(_.addLog(event)) + val r3 = if event.hasMoves then r2.ensureStarted.resume(rt.tour.official) else r2 + val finishedAt = allGamesFinished.option(r3.finishedAt.|(nowInstant)) + r3.copy(finishedAt = finishedAt) _ <- andSyncTargets.so(api.syncTargetsOfSource(round)) yield () diff --git a/modules/relay/src/main/RelayRound.scala b/modules/relay/src/main/RelayRound.scala index fe258ba5a7b0..c5c4b6b63866 100644 --- a/modules/relay/src/main/RelayRound.scala +++ b/modules/relay/src/main/RelayRound.scala @@ -20,9 +20,10 @@ case class RelayRound( startedAt: Option[Instant], /* at least it *looks* finished... but maybe it's not * sync.nextAt is used for actually synchronising */ - finished: Boolean, + finishedAt: Option[Instant], createdAt: Instant, crowd: Option[Int] + // crowdAt: Option[Instant], // in DB but not used by RelayRound ): inline def studyId = id.into(StudyId) @@ -30,6 +31,8 @@ case class RelayRound( val s = scalalib.StringOps.slug(name.value) if s.isEmpty then "-" else s + def isFinished = finishedAt.isDefined + def startsAtTime = startsAt.flatMap: case RelayRound.Starts.At(at) => at.some case _ => none @@ -37,13 +40,13 @@ case class RelayRound( def finish = copy( - finished = true, + finishedAt = finishedAt.orElse(nowInstant.some), sync = sync.pause ) def resume(official: Boolean) = copy( - finished = false, + finishedAt = none, sync = sync.play(official) ) diff --git a/modules/relay/src/main/RelayRoundForm.scala b/modules/relay/src/main/RelayRoundForm.scala index 65c06a5b7ac6..0725fb0ae81b 100644 --- a/modules/relay/src/main/RelayRoundForm.scala +++ b/modules/relay/src/main/RelayRoundForm.scala @@ -217,7 +217,7 @@ object RelayRoundForm: caption = if Granter(_.StudyAdmin) then caption else relay.caption, sync = if relay.sync.playing then sync.play(official) else sync, startsAt = relayStartsAt, - finished = ~finished + finishedAt = finished.orZero.option(relay.finishedAt.|(nowInstant)) ) private def makeSync(prev: Option[RelayRound.Sync])(using Me): Sync = @@ -241,7 +241,7 @@ object RelayRoundForm: sync = makeSync(none), createdAt = nowInstant, crowd = none, - finished = ~finished, + finishedAt = (~finished).option(nowInstant), startsAt = relayStartsAt, startedAt = none ) @@ -267,7 +267,7 @@ object RelayRoundForm: case ids: Upstream.Ids => ids, startsAt = relay.startsAtTime, startsAfterPrevious = relay.startsAfterPrevious.option(true), - finished = relay.finished.option(true), + finished = relay.isFinished.option(true), period = relay.sync.period, onlyRound = relay.sync.onlyRound, slices = relay.sync.slices, diff --git a/modules/relay/src/main/RelayStatsApi.scala b/modules/relay/src/main/RelayStatsApi.scala index 989eef7f9ce7..b484e4439eed 100644 --- a/modules/relay/src/main/RelayStatsApi.scala +++ b/modules/relay/src/main/RelayStatsApi.scala @@ -4,13 +4,13 @@ import scalalib.cache.ExpireSetMemo import lila.db.dsl.{ *, given } -object RelayStats: +private object RelayStats: type Minute = Int type Crowd = Int type Graph = List[(Minute, Crowd)] case class RoundStats(viewers: Graph) -final class RelayStatsApi(colls: RelayColls)(using scheduler: Scheduler)(using +private final class RelayStatsApi(colls: RelayColls)(using scheduler: Scheduler)(using Executor ): import RelayStats.* @@ -28,10 +28,7 @@ final class RelayStatsApi(colls: RelayColls)(using scheduler: Scheduler)(using .toList .map(RoundStats.apply) - def setActive(id: RelayRoundId) = activeRounds.put(id) - - // keep monitoring rounds for some time after they stopped syncing - private val activeRounds = ExpireSetMemo[RelayRoundId](2 hours) + def getJson(id: RelayRoundId) = get(id).map(JsonView.statsJson) private def record(): Funit = for crowds <- fetchRoundCrowds @@ -63,26 +60,20 @@ final class RelayStatsApi(colls: RelayColls)(using scheduler: Scheduler)(using yield () private def fetchRoundCrowds: Fu[List[(RelayRoundId, Crowd)]] = - val max = 500 + val max = 200 colls.round .aggregateList(maxDocs = max, _.sec): framework => import framework.* - Match( - $doc( - $or( - $doc("sync.until" -> $exists(true)), - $inIds(activeRounds.keys) - ), - "crowd".$gt(0) - ) - ) -> - List(Project($doc("_id" -> 1, "crowd" -> 1, "syncing" -> "$sync.until"))) + // lila-ws sets crowdAt along with crowd + // so we can use crowdAt to know which rounds are being monitored + Match($doc("crowdAt".$gt(nowInstant.minusMinutes(1)))) -> + List(Project($doc("_id" -> 1, "crowd" -> 1))) .map: docs => + lila.mon.relay.crowdMonitor.update(docs.size) if docs.size == max then logger.warn(s"RelayStats.fetchRoundCrowds: $max docs fetched") for doc <- docs id <- doc.getAsOpt[RelayRoundId]("_id") crowd <- doc.getAsOpt[Crowd]("crowd") - _ = if doc.contains("syncing") then activeRounds.put(id) yield (id, crowd) diff --git a/modules/relay/src/main/ui/RelayFormUi.scala b/modules/relay/src/main/ui/RelayFormUi.scala index b9a9b5b3d42b..e3d58d352a7f 100644 --- a/modules/relay/src/main/ui/RelayFormUi.scala +++ b/modules/relay/src/main/ui/RelayFormUi.scala @@ -46,7 +46,7 @@ final class RelayFormUi(helpers: Helpers, ui: RelayUi, tourUi: RelayTourUi): href := routes.RelayRound.edit(r.id), cls := List("subnav__subitem text" -> true, "active" -> nav.roundId.has(r.id)), dataIcon := ( - if r.finished then Icon.Checkmark + if r.isFinished then Icon.Checkmark else if r.hasStarted then Icon.DiscBig else Icon.DiscOutline ) diff --git a/modules/tournament/src/main/TournamentStats.scala b/modules/tournament/src/main/TournamentStats.scala index 5332e9d8b1d2..cba23468e52f 100644 --- a/modules/tournament/src/main/TournamentStats.scala +++ b/modules/tournament/src/main/TournamentStats.scala @@ -15,12 +15,10 @@ final class TournamentStatsApi( private given BSONDocumentHandler[TournamentStats] = Macros.handler - private val cache = mongoCache[TourId, TournamentStats](64, "tournament:stats", 60 days, _.value) { - loader => - _.expireAfterAccess(10 minutes) - .maximumSize(256) - .buildAsyncFuture(loader(fetch)) - } + private val cache = mongoCache[TourId, TournamentStats](64, "tournament:stats", 60 days, _.value): loader => + _.expireAfterAccess(10 minutes) + .maximumSize(256) + .buildAsyncFuture(loader(fetch)) private def fetch(tournamentId: TourId): Fu[TournamentStats] = for