Skip to content

Commit

Permalink
Merge pull request #15517 from lenguyenthanh/use-lila-ingestor
Browse files Browse the repository at this point in the history
Stop using http request to index forum posts  as We use lila-search's ingestor for indexing
  • Loading branch information
ornicar authored Jun 16, 2024
2 parents 185c2f0 + 7ac6ae9 commit 1d8ac75
Show file tree
Hide file tree
Showing 6 changed files with 10 additions and 74 deletions.
1 change: 0 additions & 1 deletion modules/api/src/main/Cli.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ final private[api] class Cli(
private def processors =
security.cli.process
.orElse(teamSearch.cli.process)
.orElse(forumSearch.cli.process)
.orElse(tournament.cli.process)
.orElse(fishnet.cli.process)
.orElse(study.cli.process)
Expand Down
2 changes: 0 additions & 2 deletions modules/core/src/main/forum.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@ import reactivemongo.api.bson.Macros.Annotations.Key
enum BusForum:
case CreatePost(post: ForumPostMini)
case RemovePost(id: ForumPostId, by: Option[UserId], text: String, asAdmin: Boolean)(using val me: MyId)
case RemovePosts(ids: List[ForumPostId])
// erasing = blanking, still in db but with empty text
case ErasePost(id: ForumPostId)
case ErasePosts(ids: List[ForumPostId])

object BusForum:
Expand Down
9 changes: 4 additions & 5 deletions modules/forum/src/main/ForumDelete.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,15 @@ final class ForumDelete(
.allByUserCursor(user)
.documentSource()
.mapAsyncUnordered(4): post =>
postApi.viewOf(post).flatMap { _.so(deletePost) }
postApi.viewOf(post).flatMap(_.so(deletePost))
.runWith(Sink.ignore)
.void

def deleteTopic(view: PostView)(using Me): Funit =
for
postIds <- postRepo.idsByTopicId(view.topic.id)
_ <- postRepo.removeByTopic(view.topic.id)
_ <- topicRepo.remove(view.topic)
_ <- categApi.denormalize(view.categ)
_ <- postRepo.removeByTopic(view.topic.id)
_ <- topicRepo.remove(view.topic)
_ <- categApi.denormalize(view.categ)
yield publishDelete(view.post)

def deletePost(view: PostView)(using Me): Funit =
Expand Down
5 changes: 1 addition & 4 deletions modules/forum/src/main/ForumPostApi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -223,14 +223,11 @@ final class ForumPostApi(
postRepo.coll.update
.one($id(post.id), post.erase)
.void
.andDo:
Bus.pub(BusForum.ErasePost(post.id))

def eraseFromSearchIndex(user: User): Funit =
postRepo.coll
.distinctEasy[ForumPostId, List]("_id", $doc("userId" -> user.id), _.sec)
.map: ids =>
Bus.pub(BusForum.ErasePosts(ids))
.map(ids => Bus.pub(BusForum.ErasePosts(ids)))

def teamIdOfPostId(postId: ForumPostId): Fu[Option[TeamId]] =
postRepo.coll.byId[ForumPost](postId).flatMapz { post =>
Expand Down
24 changes: 3 additions & 21 deletions modules/forumSearch/src/main/Env.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,13 @@ import lila.search.client.SearchClient
import lila.search.spec.Query

@Module
private class ForumSearchConfig(
@ConfigName("paginator.max_per_page") val maxPerPage: MaxPerPage
)
private class ForumSearchConfig(@ConfigName("paginator.max_per_page") val maxPerPage: MaxPerPage)

final class Env(
appConfig: Configuration,
postApi: lila.core.forum.ForumPostApi,
client: SearchClient
)(using Executor, akka.stream.Materializer):
)(using Executor):

private val config = appConfig.get[ForumSearchConfig]("forumSearch")(AutoConfig.loader)

Expand All @@ -30,23 +28,7 @@ final class Env(
def apply(text: String, page: Int, troll: Boolean) =
paginatorBuilder(Query.forum(text.take(100), troll), page)

def cli: lila.common.Cli = new:
def process = {
case "forum" :: "search" :: "reset" :: Nil => api.reset.inject("done")
case "forum" :: "search" :: "backfill" :: epochSeconds :: Nil =>
Either
.catchNonFatal(java.lang.Long.parseLong(epochSeconds))
.fold(
e => fufail(s"Invalid epochSeconds: $e"),
since => api.backfill(java.time.Instant.ofEpochSecond(since)).inject("done")
)
}

private lazy val paginatorBuilder = lila.search.PaginatorBuilder(api, config.maxPerPage)

lila.common.Bus.sub[BusForum]:
case CreatePost(post) => api.store(post)
case RemovePost(id, _, _, _) => client.deleteById(index, id.value)
case RemovePosts(ids) => client.deleteByIds(index, ids.map(_.value))
case ErasePost(id) => client.deleteById(index, id.value)
case ErasePosts(ids) => client.deleteByIds(index, ids.map(_.value))
case ErasePosts(ids) => client.deleteByIds(index, ids.map(_.value))
43 changes: 2 additions & 41 deletions modules/forumSearch/src/main/ForumSearchApi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,52 +8,13 @@ import lila.core.id.ForumPostId
import lila.search.client.SearchClient
import lila.search.spec.{ ForumSource, Query }

final class ForumSearchApi(
client: SearchClient,
postApi: ForumPostApi
)(using Executor, akka.stream.Materializer)
final class ForumSearchApi(client: SearchClient, postApi: ForumPostApi)(using Executor)
extends SearchReadApi[ForumPostId, Query.Forum]:

def search(query: Query.Forum, from: From, size: Size) =
client
.search(query, from.value, size.value)
.map: res =>
res.hitIds.map(ForumPostId.apply)
.map(res => res.hitIds.map(ForumPostId.apply))

def count(query: Query.Forum) =
client.count(query).dmap(_.count)

def store(post: ForumPostMini) =
postApi
.toMiniView(post)
.flatMapz: view =>
client.storeForum(view.post.id.value, toDoc(view))

private def toDoc(view: ForumPostMiniView) =
ForumSource(
body = view.post.text.take(10000),
topic = view.topic.name,
author = view.post.userId.map(_.value),
topicId = view.topic.id.value,
troll = view.post.troll,
date = view.post.createdAt.toEpochMilli()
)

def reset =
client.mapping(index) >>
readAndIndexPosts(none) >>
client.refresh(index)

def backfill(since: Instant) =
readAndIndexPosts(since.some)

private def readAndIndexPosts(since: Option[Instant]) =
postApi
.nonGhostCursor(since)
.documentSource()
.via(lila.common.LilaStream.logRate("forum index")(logger))
.grouped(200)
.mapAsync(1)(posts => postApi.toMiniViews(posts.toList))
.map(_.map(v => v.post.id.value -> toDoc(v)))
.mapAsyncUnordered(2)(client.storeBulkForum)
.runWith(Sink.ignore)

0 comments on commit 1d8ac75

Please sign in to comment.