Skip to content

Commit

Permalink
faster incremental FIDE sync, only writing what changed
Browse files Browse the repository at this point in the history
results in 1.5M reads to secondary,
but saves 1.5M writes on the oplog.
  • Loading branch information
ornicar committed Oct 23, 2024
1 parent 43781aa commit 9b6e13f
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 36 deletions.
1 change: 1 addition & 0 deletions modules/common/src/main/mon.scala
Original file line number Diff line number Diff line change
Expand Up @@ -691,6 +691,7 @@ object mon:
object fideSync:
val time = future("fide.sync.time")
val players = gauge("fide.sync.players").withoutTags()
val updated = gauge("fide.sync.updated").withoutTags()
val deleted = gauge("fide.sync.deleted").withoutTags()
object link:
def external(tag: String, auth: Boolean) = counter("link.external").withTags:
Expand Down
4 changes: 4 additions & 0 deletions modules/fide/src/main/FidePlayer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ case class FidePlayer(

def ratingsMap: Map[FideTC, Elo] = FideTC.values.flatMap(tc => ratingOf(tc).map(tc -> _)).toMap

def isSame(other: FidePlayer) = values == other.values

private def values = (name, fed, title, standard, standardK, rapid, rapidK, blitz, blitzK, year, inactive)

def ratingsStr = List(
"Standard" -> standard,
"Rapid" -> rapid,
Expand Down
83 changes: 47 additions & 36 deletions modules/fide/src/main/FidePlayerSync.scala
Original file line number Diff line number Diff line change
Expand Up @@ -98,31 +98,35 @@ final private class FidePlayerSync(repo: FideRepo, ws: StandaloneWSClient)(using
yield ()

private object playersFromHttpFile:
def apply(): Funit =
ws.url(listUrl)
.stream()
.flatMap:
case res if res.status == 200 =>
val startAt = nowInstant
ZipInputStreamSource: () =>
ZipInputStream(res.bodyAsSource.runWith(StreamConverters.asInputStream()))
.map(_._2)
.via(Framing.delimiter(akka.util.ByteString("\r\n"), maximumFrameLength = 200))
.map(_.utf8String)
.drop(1) // first line is a header
.map(parseLine)
.mapConcat(_.toList)
.grouped(100)
.mapAsync(1)(upsert)
.runWith(lila.common.LilaStream.sinkSum)
.monSuccess(_.fideSync.time)
.flatMap: nb =>
lila.mon.fideSync.players.update(nb)
setDeletedFlags(startAt).map: deleted =>
lila.mon.fideSync.deleted.update(deleted)
logger.info(s"RelayFidePlayerApi.update upserted: $nb, deleted: $nb")

case res => fufail(s"RelayFidePlayerApi.pull ${res.status} ${res.statusText}")
def apply(): Funit = for
httpStream <- ws.url(listUrl).stream()
_ <-
if httpStream.status != 200 then
fufail(s"RelayFidePlayerApi.pull ${httpStream.status} ${httpStream.statusText}")
else
val startAt = nowInstant
for
nbUpdated <-
ZipInputStreamSource: () =>
ZipInputStream(httpStream.bodyAsSource.runWith(StreamConverters.asInputStream()))
.map(_._2)
.via(Framing.delimiter(akka.util.ByteString("\r\n"), maximumFrameLength = 200))
.map(_.utf8String)
.drop(1) // first line is a header
.map(parseLine)
.mapConcat(_.toList)
.grouped(256)
.mapAsync(1)(saveIfChanged)
.runWith(lila.common.LilaStream.sinkSum)
.monSuccess(_.fideSync.time)
_ = lila.mon.fideSync.updated.update(nbUpdated)
nbAll <- repo.player.countAll
_ = lila.mon.fideSync.players.update(nbAll)
nbDeleted <- setDeletedFlags(startAt)
yield
lila.mon.fideSync.deleted.update(nbDeleted)
logger.info(s"RelayFidePlayerApi.update upserted: $nbUpdated, deleted: $nbDeleted")
yield ()

/*
6502938 Acevedo Mendez, Lisseth ISL F WIM WIM 1795 0 20 1767 14 20 1740 0 20 1993 w
Expand Down Expand Up @@ -161,17 +165,24 @@ final private class FidePlayerSync(repo: FideRepo, ws: StandaloneWSClient)(using
fetchedAt = nowInstant
)

private def upsert(ps: Seq[FidePlayer]): Future[Int] =
val update = repo.playerColl.update(ordered = false)
for
elements <- ps.toList.sequentially: p =>
update.element(
q = $id(p.id),
u = repo.player.handler.writeOpt(p).get,
upsert = true
)
_ <- elements.nonEmpty.so(update.many(elements).void)
yield elements.size
private def saveIfChanged(players: Seq[FidePlayer]): Future[Int] =
repo.player
.fetch(players.map(_.id))
.flatMap: inDb =>
val inDbMap: Map[FideId, FidePlayer] = inDb.mapBy(_.id)
val changed = players.filter: p =>
inDbMap.get(p.id).fold(true)(i => !i.isSame(p))
changed.nonEmpty.so:
val update = repo.playerColl.update(ordered = false)
for
elements <- changed.toList.sequentially: p =>
update.element(
q = $id(p.id),
u = repo.player.handler.writeOpt(p).get,
upsert = true
)
_ <- elements.nonEmpty.so(update.many(elements).void)
yield elements.size

private def setDeletedFlags(date: Instant): Fu[Int] = for
nbDeleted <- repo.playerColl.update
Expand Down
3 changes: 3 additions & 0 deletions modules/fide/src/main/FideRepo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ final private class FideRepo(
def selectFed(fed: hub.Federation.Id): Bdoc = $doc("fed" -> fed)
def sortStandard: Bdoc = $sort.desc("standard")
def fetch(id: FideId): Fu[Option[FidePlayer]] = playerColl.byId[FidePlayer](id)
def fetch(ids: Seq[FideId]): Fu[List[FidePlayer]] =
playerColl.find($inIds(ids)).cursor[FidePlayer](ReadPref.sec).listAll()
def countAll = playerColl.count()

object federation:
given BSONDocumentHandler[hub.Federation.Stats] = Macros.handler
Expand Down

0 comments on commit 9b6e13f

Please sign in to comment.