Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest/databricks): Fix profiling #12060

Merged
merged 34 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
d259e91
Adding a new workunit processor to check correctness of an aspect
skrydal Dec 9, 2024
9b220ed
Better verbosity
skrydal Dec 9, 2024
87ef5cf
Adjustment
skrydal Dec 9, 2024
8c3f83b
Adjustment 2
skrydal Dec 9, 2024
3684130
Fixing output
skrydal Dec 9, 2024
54dc33e
More verbosity
skrydal Dec 9, 2024
622f780
Added verbosity to profiler
skrydal Dec 9, 2024
a66a0f3
Adding more robust problems handling
skrydal Dec 9, 2024
83e1c62
Corrected the workunit processor
skrydal Dec 12, 2024
96c54d5
Even more verbose logging
skrydal Dec 12, 2024
7458341
Linting
skrydal Dec 12, 2024
24ab30b
Removed incorrect debug log
skrydal Dec 13, 2024
aa8d649
More logging
skrydal Dec 13, 2024
3832aa9
Initial logic for reducing aspects size
skrydal Dec 13, 2024
613ed36
Fixing return
skrydal Dec 13, 2024
65ac369
Fixed sample values blanking
skrydal Dec 13, 2024
4dba84b
Refined approach
skrydal Dec 15, 2024
229572d
Linting
skrydal Dec 15, 2024
01b4203
Merge branch 'master' into fix_databricks_profiling
skrydal Dec 15, 2024
71b1144
Resolving conflicts
skrydal Dec 15, 2024
9333658
Fixed import
skrydal Dec 16, 2024
b37ee89
Added extensive testing for schema metadata aspect truncating
skrydal Dec 16, 2024
12693c1
Imports fix
skrydal Dec 16, 2024
83797ea
Merge branch 'master' into fix_databricks_profiling
skrydal Dec 16, 2024
c2ae385
Better logging
skrydal Dec 16, 2024
6ed6fe8
Revert "build(gradle): version change (Gradle and shadow plugin) (#11…
skrydal Dec 16, 2024
f77f047
Reapply "build(gradle): version change (Gradle and shadow plugin) (#1…
skrydal Dec 16, 2024
b260b7a
Removing redundant verbosity
skrydal Dec 16, 2024
957d8ac
Merge branch 'master' into fix_databricks_profiling
skrydal Dec 16, 2024
b415622
Adjusting logging
skrydal Dec 16, 2024
257d2f0
Adjusting again
skrydal Dec 16, 2024
34e3bb2
Merge branch 'master' into fix_databricks_profiling
skrydal Dec 19, 2024
3e94a4b
Using SourceReport for reporting aspects truncation
skrydal Dec 19, 2024
e1c99ab
Spelling change
skrydal Dec 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion metadata-ingestion/src/datahub/emitter/rest_emitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ def emit_mcps(
mcps: List[Union[MetadataChangeProposal, MetadataChangeProposalWrapper]],
async_flag: Optional[bool] = None,
) -> int:
logger.debug("Attempting to emit batch mcps")
url = f"{self._gms_server}/aspects?action=ingestProposalBatch"
for mcp in mcps:
ensure_has_system_metadata(mcp)
Expand All @@ -303,15 +304,22 @@ def emit_mcps(
current_chunk_size = INGEST_MAX_PAYLOAD_BYTES
for mcp_obj in mcp_objs:
mcp_obj_size = len(json.dumps(mcp_obj))
logger.debug(
f"Iterating through object with size {mcp_obj_size} (type: {mcp_obj.get('aspectName')}"
)

if (
mcp_obj_size + current_chunk_size > INGEST_MAX_PAYLOAD_BYTES
or len(mcp_obj_chunks[-1]) >= BATCH_INGEST_MAX_PAYLOAD_LENGTH
):
logger.debug("Decided to create new chunk")
mcp_obj_chunks.append([])
current_chunk_size = 0
mcp_obj_chunks[-1].append(mcp_obj)
current_chunk_size += mcp_obj_size
logger.debug(
f"Decided to send {len(mcps)} mcps in {len(mcp_obj_chunks)} chunks"
)

for mcp_obj_chunk in mcp_obj_chunks:
# TODO: We're calling json.dumps on each MCP object twice, once to estimate
Expand All @@ -338,8 +346,15 @@ def emit_usage(self, usageStats: UsageAggregation) -> None:

def _emit_generic(self, url: str, payload: str) -> None:
curl_command = make_curl_command(self._session, "POST", url, payload)
payload_size = len(payload)
if payload_size > INGEST_MAX_PAYLOAD_BYTES:
# since we know total payload size here, we could simply avoid sending such payload at all and report a warning, with current approach we are going to cause whole ingestion to fail
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, I’m a bit confused about this comment.
If we send a payload that's too big, it’ll fail on the backend only and no impact in the ingestion pipeline, right?
this is how it works both before and with the changes in this PR, correct?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ingestion will be marked as failed if the payload size is exceeded (GMS will return 400).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it!

The backend may better respond with 413 Content Too Large and then ingestor may easily skip https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/413

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Whether to skip such error though - that's another question.

logger.warning(
f"Apparent payload size exceeded {INGEST_MAX_PAYLOAD_BYTES}, might fail with an exception due to the size"
)
logger.debug(
"Attempting to emit to DataHub GMS; using curl equivalent to:\n%s",
"Attempting to emit aspect (size: %s) to DataHub GMS; using curl equivalent to:\n%s",
payload_size,
curl_command,
)
try:
Expand Down
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When it comes to truncating the profile, the warnings in the ingestion reports might be enough. But with schema, it could be a bigger deal, so it might be good to show in the UI if the schema’s been cut. We could either extend the SchemaMetadata model or just add a fixed field to flag that some fields are missing.

This is just my opinion, and it might need input from the product/UX folks to decide.

Of course, we could tackle this in later PRs to keep things moving for users affected by the issue.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree it would be good to introduce a flag in the aspect itself to indicate truncation happening. We are actively truncating columns in BigQuery source if there are more than 300 (it is configurable though). Users were confused by this since no information of this trimming appeared in the UI.

Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import json
import logging
from typing import Iterable, List

from datahub.emitter.rest_emitter import INGEST_MAX_PAYLOAD_BYTES
from datahub.emitter.serialization_helper import pre_json_transform
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.metadata.schema_classes import (
DatasetProfileClass,
SchemaFieldClass,
SchemaMetadataClass,
)

logger = logging.getLogger(__name__)


def ensure_dataset_profile_size(profile: DatasetProfileClass) -> None:
"""
This is quite arbitrary approach to ensuring dataset profile aspect does not exceed allowed size, might be adjusted
in the future
"""
sample_fields_size = 0
if profile.fieldProfiles:
logger.debug(f"Length of field profiles: {len(profile.fieldProfiles)}")
for field in profile.fieldProfiles:
if field.sampleValues:
values_len = 0
for value in field.sampleValues:
if value:
values_len += len(value)
logger.debug(
f"Field {field.fieldPath} has {len(field.sampleValues)} sample values, taking total bytes {values_len}"
)
if sample_fields_size + values_len > INGEST_MAX_PAYLOAD_BYTES:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Different sink types might have their own limits. Even the limit could be a configuration parameter for the sink rather than sticking with INGEST_MAX_PAYLOAD_BYTES.

Again, just a possible future improvement!

logger.warning(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ideally we'd pass the source reporter, and then we'd do report.warning(...) and this would show up as a warning in the final ingestion report in the UI

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hsheth2 adjusted the code to use source report

f"Adding sample values for field {field.fieldPath} would exceed total allowed size of an aspect, therefor skipping them"
)
field.sampleValues = []
else:
sample_fields_size += values_len
else:
logger.debug(f"Field {field.fieldPath} has no sample values")


def ensure_schema_metadata_size(schema: SchemaMetadataClass) -> None:
"""
This is quite arbitrary approach to ensuring schema metadata aspect does not exceed allowed size, might be adjusted
in the future
"""
total_fields_size = 0
logger.debug(f"Amount of schema fields: {len(schema.fields)}")
accepted_fields: List[SchemaFieldClass] = []
for field in schema.fields:
field_size = len(json.dumps(pre_json_transform(field.to_obj())))
logger.debug(f"Field {field.fieldPath} takes total {field_size}")
if total_fields_size + field_size < INGEST_MAX_PAYLOAD_BYTES:
accepted_fields.append(field)
total_fields_size += field_size
else:
logger.warning(
f"Keeping field {field.fieldPath} would make aspect exceed total allowed size, therefor skipping it"
)
schema.fields = accepted_fields


def ensure_aspect_size(
stream: Iterable[MetadataWorkUnit],
) -> Iterable[MetadataWorkUnit]:
"""
We have hard limitation of aspect size being 16 MB. Some aspects can exceed that value causing an exception
on GMS side and failure of the entire ingestion. This processor will attempt to trim suspected aspects.
"""
for wu in stream:
logger.debug(f"Ensuring size of workunit: {wu.id}")

if schema := wu.get_aspect_of_type(SchemaMetadataClass):
ensure_schema_metadata_size(schema)
elif profile := wu.get_aspect_of_type(DatasetProfileClass):
ensure_dataset_profile_size(profile)
yield wu
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
gen_containers,
)
from datahub.emitter.sql_parsing_builder import SqlParsingBuilder
from datahub.ingestion.api.auto_work_units.auto_ensure_aspect_size import (
ensure_aspect_size,
)
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
SupportStatus,
Expand Down Expand Up @@ -260,6 +263,7 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
StaleEntityRemovalHandler.create(
self, self.config, self.ctx
).workunit_processor,
ensure_aspect_size,
]

def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
Expand Down
Loading
Loading