Skip to content

Commit

Permalink
refactor ElasticsearchIndexingService to ESMappingMetaStatsService, a…
Browse files Browse the repository at this point in the history
…nd move it to stats.py; add BuildDocMetaStatsService to stats.py; use BuildDocMetaStatsService in BaseVariantIndexer to avoid dirty write to build doc if this indexer is a hot indexer (as in a ColdHotIndexer)
  • Loading branch information
erikyao committed Aug 24, 2023
1 parent 9da9f7c commit 11c3f78
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 159 deletions.
63 changes: 22 additions & 41 deletions src/hub/dataindex/indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from biothings.hub.dataindex.indexer import Indexer, IndexManager, ColdHotIndexer
from biothings.hub.dataexport.ids import export_ids, upload_ids
from biothings.utils.hub_db import get_src_build
from utils.es import ElasticsearchIndexingService
from utils.stats import ESMappingMetaStatsService, BuildDocMetaStatsService

from elasticsearch import JSONSerializer, SerializationError, Elasticsearch
from elasticsearch.compat import string_types
Expand Down Expand Up @@ -44,20 +44,16 @@ def dumps(self, data):
If false, these characters will be output as-is.
separators: an (item_separator, key_separator) tuple, specifying the separators in the output.
"""
# return json.dumps(
# data, default=self.default, ensure_ascii=False, separators=(",", ":")
# )
# return json.dumps(data, default=self.default, ensure_ascii=False, separators=(",", ":"))

"""
`orjson.dumps()` will escape all incoming non-ASCII characters and output the encoded bytestrings.
We decode the output bytestrings into string, and as a result, those escaped characters are un-escaped.
`orjson.dumps()` will escape all incoming non-ASCII characters and output the encoded byte-strings.
We decode the output byte-strings into string, and as a result, those escaped characters are un-escaped.
In Python 3, the default encoding is "utf-8" (see https://docs.python.org/3/library/stdtypes.html#bytes.decode).
`orjson.dumps()` will output compact JSON representation, effectively the same behavior with json.dumps(separators=(",", ":"))
"""
return orjson.dumps(
data, default=self.default
).decode()
return orjson.dumps(data, default=self.default).decode()
except (ValueError, TypeError) as e:
raise SerializationError(data, e)

Expand All @@ -67,13 +63,11 @@ class BaseVariantIndexer(Indexer):
def __init__(self, build_doc, indexer_env, index_name):
super().__init__(build_doc, indexer_env, index_name)

# TODO should we use _BuildDoc to wrap build_doc?
# TODO or should we make Indexer._build_doc a visible member?
self.build_doc = build_doc

# Changing the `es_client_args` object might affect top level config serialization.
# we have an endpoint to print the config, it might be safer to avoid changing the `es_client_args` object.
self.es_client_args = dict(self.es_client_args)
# self.es_client_args = dict(self.es_client_args)
self.logger.info(f"BaseVariantIndexer.__init__() received indexer_env={indexer_env}")

self.es_client_args["serializer"] = MyVariantJSONSerializer()

self.es_index_mappings["properties"]["chrom"] = {
Expand All @@ -91,57 +85,44 @@ def __init__(self, build_doc, indexer_env, index_name):
}
}
}

self.es_index_settings["mapping"] = {
"total_fields": {
"limit": 2000
}
}

self.assembly = build_doc["build_config"]["assembly"]

async def post_index(self, *args, **kwargs):
# No idea how come the decision to sleep for 3 minutes
# Migrated from Sebastian's commit 1a7b7a. It was orginally marked "Not Tested Yet".
# Migrated from Sebastian's commit 1a7b7a. It was originally marked "Not Tested Yet".
self.logger.info("Sleeping for a bit while index is being fully updated...")
await asyncio.sleep(3 * 60)

# update _meta.stats in ES mapping
# STEP 1: update _meta.stats in ES mapping
with Elasticsearch(**self.es_client_args) as es_client:
es_service = ElasticsearchIndexingService(client=es_client, index_name=self.es_index_name)
meta_stats = es_service.update_mapping_meta_stats(assembly=self.assembly)
mapping_service = ESMappingMetaStatsService(client=es_client, index_name=self.es_index_name)
meta_stats = mapping_service.update_mapping_meta_stats(assembly=self.assembly)
self.logger.info(f"_meta.stats updated to {meta_stats} for index {self.es_index_name}")

# update _meta.stats in myvariant_hubdb.src_build
# STEP 2: update _meta.stats in MongoDB myvariant_hubdb.src_build

# DO NOT use the following client because the `_build_doc.parse_backend()` in Indexer.__init__() won't work
# for a MyVariant build_doc. It results in `self.mongo_database_name` equal to "myvariant" and
# `self.mongo_collection_name` equal to `self.build_doc["_id"]`
# `self.mongo_collection_name` equal to `build_doc["_id"]`
# TODO revise if https://github.com/biothings/biothings.api/issues/238 fixed
#
# mongo_client = MongoClient(**self.mongo_client_args)
# mongo_database = mongo_client[self.mongo_database_name]
# mongo_collection = mongo_database[self.mongo_collection_name]
#
# TODO revise if https://github.com/biothings/biothings.api/issues/238 fixed
src_build_collection = get_src_build()
self.logger.debug(f"{src_build_collection.database.client}")
self.logger.debug(f"{src_build_collection.full_name}")

self.logger.info(f"_meta.stats of document {self.build_doc['_id']} is {self.build_doc['_meta']['stats']} "
f"before post-index update")
self.build_doc["_meta"]["stats"] = meta_stats
result = src_build_collection.replace_one({"_id": self.build_doc["_id"]}, self.build_doc)

if result.matched_count != 1:
raise ValueError(f"cannot find document {self.build_doc['_id']} "
f"in collection {src_build_collection.full_name}")

if result.modified_count != 1:
raise ValueError(f"failed to update _meta.stats for document {self.build_doc['_id']} "
f"in collection {src_build_collection.full_name}")

self.logger.info(f"_meta.stats updated to {meta_stats} for document {self.build_doc['_id']} "
f"in collection {src_build_collection.full_name}")
src_build = get_src_build()
build_service = BuildDocMetaStatsService(src_build=src_build, build_name=self.build_name, logger=self.logger)
build_service.update_build_meta_stats(meta_stats)

return meta_stats
# return nothing, otherwise the returned values would be written to the associated build_doc by PostIndexJSR
return


class MyVariantIndexerManager(IndexManager):
Expand Down
118 changes: 0 additions & 118 deletions src/utils/es.py

This file was deleted.

105 changes: 105 additions & 0 deletions src/utils/stats.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from biothings.utils.es import ESIndexer
from elasticsearch import Elasticsearch


def update_stats(idxer: ESIndexer, assembly):
Expand All @@ -14,3 +15,107 @@ def update_stats(idxer: ESIndexer, assembly):
m["_meta"].get("stats", {}).update(stats)
idxer.update_mapping_meta(m)
return m["_meta"]["stats"]


class ESReleaseException(Exception):
pass


class ESMappingMetaStatsService:
REQUIRED_MIN_RELEASE = 7

def __init__(self, client: Elasticsearch, index_name: str):
self.client = client
self.index_name = index_name

self._check_release(client, self.REQUIRED_MIN_RELEASE)

@classmethod
def _check_release(cls, client: Elasticsearch, required_min_release):
"""
Check if the current ES version is equal to or above the required minimum release.
E.g. if major release 8 is required, ES version "7.13.4" does not satisfy.
In this example, "7" is the actual major release, "13" the minor release, and "4" the maintenance release.
"""
version = client.info()['version']['number'] # a string like "7.13.4"
release = int(version.split('.')[0]) # an int like 7
if release < required_min_release:
raise ESReleaseException(f"Required ES minimum release is {required_min_release}, found version {version} installed.")

def update_mapping_meta_stats(self, assembly):
"""
Update the "stats" entry inside the "_meta" field of the index's mapping. Return the updated "meta._stats" field of the mapping.
Args:
assembly (str): a string of "hg19" or "hg38"
"""

"""
With ES7, self.client.indices.get_mapping(index_name) will return a dict like
{
"<index_name>": {
"mappings": {
"_meta": {
"build_date": "2021-08-29T15:43:59.554260-07:00",
"biothing_type": "variant",
"stats": {
"total": ...
"vcf": ...
"observed": ...
"<assembly>": ...
},
"src": {...},
"build_version": "20210829"
},
"properties": {...}
}
}
}
The goal here is to update the "stats" field inside.
"""

stats = dict()
stats["total"] = self.client.count(index=self.index_name)["count"]
for field in [assembly, "observed", "vcf"]:
body = {"query": {"exists": {"field": field}}}
stats[field] = self.client.count(index=self.index_name, body=body)["count"]

mapping = self.client.indices.get_mapping(index=self.index_name)
meta = mapping[self.index_name]["mappings"]["_meta"] # Get the current meta field from mapping
meta.get("stats", {}).update(stats) # Update the meta content
self.client.indices.put_mapping(body={"_meta": meta}, index=self.index_name) # Write the modified meta to ES mapping

return meta["stats"]


class BuildDocMetaStatsService:
def __init__(self, src_build, build_name: str, logger=None):
self.src_build = src_build
self.build_name = build_name
self.logger = logger

def update_build_meta_stats(self, meta_stats):
"""
Update the "stats" entry inside the "_meta" field of the build doc. Return the updated "meta._stats" field of the build_doc.
"""
build_doc = self.src_build.find_one({"_id": self.build_name})

if self.logger:
self.logger.info(f"_meta.stats of document {self.build_name} is {build_doc['_meta']['stats']} in collection {self.src_build.full_name} "
f"before post-index")

build_doc["_meta"].get("stats", {}).update(meta_stats)

result = self.src_build.replace_one({"_id": self.build_name}, build_doc)
if result.matched_count != 1:
raise ValueError(f"cannot find document {self.build_name} in collection {self.src_build.full_name}")
if result.modified_count != 1:
raise ValueError(f"failed to update _meta.stats in document {self.build_name} in collection {self.src_build.full_name}")

if self.logger:
self.logger.info(f"_meta.stats of document {self.build_name} is updated to {meta_stats} in collection {self.src_build.full_name} "
f"during post-index")

return build_doc["_meta"]["stats"]

0 comments on commit 11c3f78

Please sign in to comment.