-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(ingest/databricks): Fix profiling #12060
Changes from all commits
d259e91
9b220ed
87ef5cf
8c3f83b
3684130
54dc33e
622f780
a66a0f3
83e1c62
96c54d5
7458341
24ab30b
aa8d649
3832aa9
613ed36
65ac369
4dba84b
229572d
01b4203
71b1144
9333658
b37ee89
12693c1
83797ea
c2ae385
6ed6fe8
f77f047
b260b7a
957d8ac
b415622
257d2f0
34e3bb2
3e94a4b
e1c99ab
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When it comes to truncating the profile, the warnings in the ingestion reports might be enough. But with schema, it could be a bigger deal, so it might be good to show in the UI if the schema’s been cut. We could either extend the This is just my opinion, and it might need input from the product/UX folks to decide. Of course, we could tackle this in later PRs to keep things moving for users affected by the issue. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree it would be good to introduce a flag in the aspect itself to indicate truncation happening. We are actively truncating columns in |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,96 @@ | ||
import json | ||
import logging | ||
from typing import Iterable, List | ||
|
||
from datahub.emitter.rest_emitter import INGEST_MAX_PAYLOAD_BYTES | ||
from datahub.emitter.serialization_helper import pre_json_transform | ||
from datahub.ingestion.api.source import SourceReport | ||
from datahub.ingestion.api.workunit import MetadataWorkUnit | ||
from datahub.metadata.schema_classes import ( | ||
DatasetProfileClass, | ||
SchemaFieldClass, | ||
SchemaMetadataClass, | ||
) | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class EnsureAspectSizeProcessor: | ||
def __init__( | ||
self, report: SourceReport, payload_constraint: int = INGEST_MAX_PAYLOAD_BYTES | ||
): | ||
self.report = report | ||
self.payload_constraint = payload_constraint | ||
|
||
def ensure_dataset_profile_size( | ||
self, dataset_urn: str, profile: DatasetProfileClass | ||
) -> None: | ||
""" | ||
This is quite arbitrary approach to ensuring dataset profile aspect does not exceed allowed size, might be adjusted | ||
in the future | ||
""" | ||
sample_fields_size = 0 | ||
if profile.fieldProfiles: | ||
logger.debug(f"Length of field profiles: {len(profile.fieldProfiles)}") | ||
for field in profile.fieldProfiles: | ||
if field.sampleValues: | ||
values_len = 0 | ||
for value in field.sampleValues: | ||
if value: | ||
values_len += len(value) | ||
logger.debug( | ||
f"Field {field.fieldPath} has {len(field.sampleValues)} sample values, taking total bytes {values_len}" | ||
) | ||
if sample_fields_size + values_len > self.payload_constraint: | ||
field.sampleValues = [] | ||
self.report.warning( | ||
title="Dataset profile truncated due to size constraint", | ||
message="Dataset profile contained too much data and would have caused ingestion to fail", | ||
context=f"Sample values for field {field.fieldPath} were removed from dataset profile for {dataset_urn} due to aspect size constraints", | ||
) | ||
else: | ||
sample_fields_size += values_len | ||
else: | ||
logger.debug(f"Field {field.fieldPath} has no sample values") | ||
|
||
def ensure_schema_metadata_size( | ||
self, dataset_urn: str, schema: SchemaMetadataClass | ||
) -> None: | ||
""" | ||
This is quite arbitrary approach to ensuring schema metadata aspect does not exceed allowed size, might be adjusted | ||
in the future | ||
""" | ||
total_fields_size = 0 | ||
logger.debug(f"Amount of schema fields: {len(schema.fields)}") | ||
accepted_fields: List[SchemaFieldClass] = [] | ||
for field in schema.fields: | ||
field_size = len(json.dumps(pre_json_transform(field.to_obj()))) | ||
logger.debug(f"Field {field.fieldPath} takes total {field_size}") | ||
if total_fields_size + field_size < self.payload_constraint: | ||
accepted_fields.append(field) | ||
total_fields_size += field_size | ||
else: | ||
self.report.warning( | ||
title="Schema truncated due to size constraint", | ||
message="Dataset schema contained too much data and would have caused ingestion to fail", | ||
context=f"Field {field.fieldPath} was removed from schema for {dataset_urn} due to aspect size constraints", | ||
) | ||
|
||
schema.fields = accepted_fields | ||
|
||
def ensure_aspect_size( | ||
self, | ||
stream: Iterable[MetadataWorkUnit], | ||
) -> Iterable[MetadataWorkUnit]: | ||
""" | ||
We have hard limitation of aspect size being 16 MB. Some aspects can exceed that value causing an exception | ||
on GMS side and failure of the entire ingestion. This processor will attempt to trim suspected aspects. | ||
""" | ||
for wu in stream: | ||
logger.debug(f"Ensuring size of workunit: {wu.id}") | ||
|
||
if schema := wu.get_aspect_of_type(SchemaMetadataClass): | ||
self.ensure_schema_metadata_size(wu.get_urn(), schema) | ||
elif profile := wu.get_aspect_of_type(DatasetProfileClass): | ||
self.ensure_dataset_profile_size(wu.get_urn(), profile) | ||
yield wu |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey, I’m a bit confused about this comment.
If we send a payload that's too big, it’ll fail on the backend only and no impact in the ingestion pipeline, right?
this is how it works both before and with the changes in this PR, correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The ingestion will be marked as failed if the payload size is exceeded (GMS will return 400).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it!
The backend may better respond with
413 Content Too Large
and then ingestor may easily skip https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/413There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. Whether to skip such error though - that's another question.