From 9b6e13fd78823c67ef3633d2537b82d2c6c34ede Mon Sep 17 00:00:00 2001 From: Thibault Duplessis Date: Wed, 23 Oct 2024 11:54:04 +0200 Subject: [PATCH] faster incremental FIDE sync, only writing what changed results in 1.5M reads to secondary, but saves 1.5M writes on the oplog. --- modules/common/src/main/mon.scala | 1 + modules/fide/src/main/FidePlayer.scala | 4 ++ modules/fide/src/main/FidePlayerSync.scala | 83 ++++++++++++---------- modules/fide/src/main/FideRepo.scala | 3 + 4 files changed, 55 insertions(+), 36 deletions(-) diff --git a/modules/common/src/main/mon.scala b/modules/common/src/main/mon.scala index 7474d32a4c6a..432222e5dd11 100644 --- a/modules/common/src/main/mon.scala +++ b/modules/common/src/main/mon.scala @@ -691,6 +691,7 @@ object mon: object fideSync: val time = future("fide.sync.time") val players = gauge("fide.sync.players").withoutTags() + val updated = gauge("fide.sync.updated").withoutTags() val deleted = gauge("fide.sync.deleted").withoutTags() object link: def external(tag: String, auth: Boolean) = counter("link.external").withTags: diff --git a/modules/fide/src/main/FidePlayer.scala b/modules/fide/src/main/FidePlayer.scala index 8c7786518b3a..d278a1ad945c 100644 --- a/modules/fide/src/main/FidePlayer.scala +++ b/modules/fide/src/main/FidePlayer.scala @@ -44,6 +44,10 @@ case class FidePlayer( def ratingsMap: Map[FideTC, Elo] = FideTC.values.flatMap(tc => ratingOf(tc).map(tc -> _)).toMap + def isSame(other: FidePlayer) = values == other.values + + private def values = (name, fed, title, standard, standardK, rapid, rapidK, blitz, blitzK, year, inactive) + def ratingsStr = List( "Standard" -> standard, "Rapid" -> rapid, diff --git a/modules/fide/src/main/FidePlayerSync.scala b/modules/fide/src/main/FidePlayerSync.scala index 57291e6c2440..d5b6042389a5 100644 --- a/modules/fide/src/main/FidePlayerSync.scala +++ b/modules/fide/src/main/FidePlayerSync.scala @@ -98,31 +98,35 @@ final private class FidePlayerSync(repo: FideRepo, ws: StandaloneWSClient)(using yield () private object playersFromHttpFile: - def apply(): Funit = - ws.url(listUrl) - .stream() - .flatMap: - case res if res.status == 200 => - val startAt = nowInstant - ZipInputStreamSource: () => - ZipInputStream(res.bodyAsSource.runWith(StreamConverters.asInputStream())) - .map(_._2) - .via(Framing.delimiter(akka.util.ByteString("\r\n"), maximumFrameLength = 200)) - .map(_.utf8String) - .drop(1) // first line is a header - .map(parseLine) - .mapConcat(_.toList) - .grouped(100) - .mapAsync(1)(upsert) - .runWith(lila.common.LilaStream.sinkSum) - .monSuccess(_.fideSync.time) - .flatMap: nb => - lila.mon.fideSync.players.update(nb) - setDeletedFlags(startAt).map: deleted => - lila.mon.fideSync.deleted.update(deleted) - logger.info(s"RelayFidePlayerApi.update upserted: $nb, deleted: $nb") - - case res => fufail(s"RelayFidePlayerApi.pull ${res.status} ${res.statusText}") + def apply(): Funit = for + httpStream <- ws.url(listUrl).stream() + _ <- + if httpStream.status != 200 then + fufail(s"RelayFidePlayerApi.pull ${httpStream.status} ${httpStream.statusText}") + else + val startAt = nowInstant + for + nbUpdated <- + ZipInputStreamSource: () => + ZipInputStream(httpStream.bodyAsSource.runWith(StreamConverters.asInputStream())) + .map(_._2) + .via(Framing.delimiter(akka.util.ByteString("\r\n"), maximumFrameLength = 200)) + .map(_.utf8String) + .drop(1) // first line is a header + .map(parseLine) + .mapConcat(_.toList) + .grouped(256) + .mapAsync(1)(saveIfChanged) + .runWith(lila.common.LilaStream.sinkSum) + .monSuccess(_.fideSync.time) + _ = lila.mon.fideSync.updated.update(nbUpdated) + nbAll <- repo.player.countAll + _ = lila.mon.fideSync.players.update(nbAll) + nbDeleted <- setDeletedFlags(startAt) + yield + lila.mon.fideSync.deleted.update(nbDeleted) + logger.info(s"RelayFidePlayerApi.update upserted: $nbUpdated, deleted: $nbDeleted") + yield () /* 6502938 Acevedo Mendez, Lisseth ISL F WIM WIM 1795 0 20 1767 14 20 1740 0 20 1993 w @@ -161,17 +165,24 @@ final private class FidePlayerSync(repo: FideRepo, ws: StandaloneWSClient)(using fetchedAt = nowInstant ) - private def upsert(ps: Seq[FidePlayer]): Future[Int] = - val update = repo.playerColl.update(ordered = false) - for - elements <- ps.toList.sequentially: p => - update.element( - q = $id(p.id), - u = repo.player.handler.writeOpt(p).get, - upsert = true - ) - _ <- elements.nonEmpty.so(update.many(elements).void) - yield elements.size + private def saveIfChanged(players: Seq[FidePlayer]): Future[Int] = + repo.player + .fetch(players.map(_.id)) + .flatMap: inDb => + val inDbMap: Map[FideId, FidePlayer] = inDb.mapBy(_.id) + val changed = players.filter: p => + inDbMap.get(p.id).fold(true)(i => !i.isSame(p)) + changed.nonEmpty.so: + val update = repo.playerColl.update(ordered = false) + for + elements <- changed.toList.sequentially: p => + update.element( + q = $id(p.id), + u = repo.player.handler.writeOpt(p).get, + upsert = true + ) + _ <- elements.nonEmpty.so(update.many(elements).void) + yield elements.size private def setDeletedFlags(date: Instant): Fu[Int] = for nbDeleted <- repo.playerColl.update diff --git a/modules/fide/src/main/FideRepo.scala b/modules/fide/src/main/FideRepo.scala index 9952775dd893..86a5ed2693af 100644 --- a/modules/fide/src/main/FideRepo.scala +++ b/modules/fide/src/main/FideRepo.scala @@ -17,6 +17,9 @@ final private class FideRepo( def selectFed(fed: hub.Federation.Id): Bdoc = $doc("fed" -> fed) def sortStandard: Bdoc = $sort.desc("standard") def fetch(id: FideId): Fu[Option[FidePlayer]] = playerColl.byId[FidePlayer](id) + def fetch(ids: Seq[FideId]): Fu[List[FidePlayer]] = + playerColl.find($inIds(ids)).cursor[FidePlayer](ReadPref.sec).listAll() + def countAll = playerColl.count() object federation: given BSONDocumentHandler[hub.Federation.Stats] = Macros.handler