Skip to content

Commit

Permalink
feat(ingest): set pipeline name in system metadata (#10190)
Browse files Browse the repository at this point in the history
Co-authored-by: david-leifker <[email protected]>
  • Loading branch information
hsheth2 and david-leifker authored Jun 27, 2024
1 parent 0417e68 commit f4be88d
Show file tree
Hide file tree
Showing 31 changed files with 5,053 additions and 12,894 deletions.
17 changes: 9 additions & 8 deletions metadata-ingestion/docs/sources/datahub/datahub_recipe.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@ source:
enabled: true
ignore_old_state: false
urn_pattern:
deny:
# Ignores all datahub metadata where the urn matches the regex
- ^denied.urn.*
allow:
# Ingests all datahub metadata where the urn matches the regex.
- ^allowed.urn.*
extractor_config:
set_system_metadata: false # Replicate system metadata
deny:
# Ignores all datahub metadata where the urn matches the regex
- ^denied.urn.*
allow:
# Ingests all datahub metadata where the urn matches the regex.
- ^allowed.urn.*

flags:
set_system_metadata: false # Replicate system metadata

# Here, we write to a DataHub instance
# You can also use a different sink, e.g. to write the data to a file instead
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/src/datahub/cli/check_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ def metadata_file(json_file: str, rewrite: bool, unpack_mces: bool) -> None:
"config": {"filename": json_file},
"extractor": "generic",
"extractor_config": {
"set_system_metadata": False,
"unpack_mces_into_mcps": unpack_mces,
},
},
"flags": {"set_system_metadata": False},
"sink": {
"type": "file",
"config": {"filename": out_file.name},
Expand Down
30 changes: 19 additions & 11 deletions metadata-ingestion/src/datahub/ingestion/api/workunit.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,14 @@
from dataclasses import dataclass
from typing import Iterable, Optional, Type, TypeVar, Union, overload

from deprecated import deprecated

from datahub.emitter.aspect import TIMESERIES_ASPECT_MAP
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import WorkUnit
from datahub.metadata.com.linkedin.pegasus2avro.mxe import (
MetadataChangeEvent,
MetadataChangeProposal,
)
from datahub.metadata.schema_classes import UsageAggregationClass, _Aspect
from datahub.metadata.schema_classes import _Aspect

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -158,11 +156,21 @@ def decompose_mce_into_mcps(self) -> Iterable["MetadataWorkUnit"]:
for mcpw in mcps_from_mce(self.metadata)
]


@deprecated
@dataclass
class UsageStatsWorkUnit(WorkUnit):
usageStats: UsageAggregationClass

def get_metadata(self) -> dict:
return {"usage": self.usageStats}
@classmethod
def from_metadata(
cls,
metadata: Union[
MetadataChangeEvent, MetadataChangeProposal, MetadataChangeProposalWrapper
],
id: Optional[str] = None,
) -> "MetadataWorkUnit":
workunit_id = id or cls.generate_workunit_id(metadata)

if isinstance(metadata, MetadataChangeEvent):
return MetadataWorkUnit(id=workunit_id, mce=metadata)
elif isinstance(metadata, (MetadataChangeProposal)):
return MetadataWorkUnit(id=workunit_id, mcp_raw=metadata)
elif isinstance(metadata, MetadataChangeProposalWrapper):
return MetadataWorkUnit(id=workunit_id, mcp=metadata)
else:
raise ValueError(f"Unexpected metadata type {type(metadata)}")
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,13 @@
from typing import Iterable, Union

from datahub.configuration.common import ConfigModel
from datahub.emitter.mce_builder import get_sys_time
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import RecordEnvelope
from datahub.ingestion.api.source import Extractor, WorkUnit
from datahub.ingestion.api.workunit import MetadataWorkUnit, UsageStatsWorkUnit
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.metadata.com.linkedin.pegasus2avro.mxe import (
MetadataChangeEvent,
MetadataChangeProposal,
SystemMetadata,
)

logger = logging.getLogger(__name__)
Expand All @@ -26,10 +24,6 @@ def _try_reformat_with_black(code: str) -> str:


class WorkUnitRecordExtractorConfig(ConfigModel):
set_system_metadata: bool = True
set_system_metadata_pipeline_name: bool = (
False # false for now until the models are available in OSS
)
unpack_mces_into_mcps: bool = False


Expand Down Expand Up @@ -65,14 +59,6 @@ def get_records(
MetadataChangeProposalWrapper,
),
):
if self.config.set_system_metadata:
workunit.metadata.systemMetadata = SystemMetadata(
lastObserved=get_sys_time(), runId=self.ctx.run_id
)
if self.config.set_system_metadata_pipeline_name:
workunit.metadata.systemMetadata.pipelineName = (
self.ctx.pipeline_name
)
if (
isinstance(workunit.metadata, MetadataChangeEvent)
and len(workunit.metadata.proposedSnapshot.aspects) == 0
Expand All @@ -92,11 +78,6 @@ def get_records(
"workunit_id": workunit.id,
},
)
elif isinstance(workunit, UsageStatsWorkUnit):
logger.error(
"Dropping deprecated `UsageStatsWorkUnit`. "
"Emit a `MetadataWorkUnit` with the `datasetUsageStatistics` aspect instead."
)
else:
raise ValueError(f"unknown WorkUnit type {type(workunit)}")

Expand Down
6 changes: 6 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/run/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@
from datahub.ingestion.sink.file import FileSink, FileSinkConfig
from datahub.ingestion.sink.sink_registry import sink_registry
from datahub.ingestion.source.source_registry import source_registry
from datahub.ingestion.transformer.system_metadata_transformer import (
SystemMetadataTransformer,
)
from datahub.ingestion.transformer.transform_registry import transform_registry
from datahub.metadata.schema_classes import MetadataChangeProposalClass
from datahub.telemetry import stats, telemetry
Expand Down Expand Up @@ -317,6 +320,9 @@ def _configure_transforms(self) -> None:
f"Transformer type:{transformer_type},{transformer_class} configured"
)

# Add the system metadata transformer at the end of the list.
self.transformers.append(SystemMetadataTransformer(self.ctx))

def _configure_reporting(
self, report_to: Optional[str], no_default_report: bool
) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,14 @@ class FlagsConfig(ConfigModel):
),
)

set_system_metadata: bool = Field(
True, description="Set system metadata on entities."
)
set_system_metadata_pipeline_name: bool = Field(
True,
description="Set system metadata pipeline name. Requires `set_system_metadata` to be enabled.",
)


class PipelineConfig(ConfigModel):
source: SourceConfig
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
from typing import Callable, Iterable, Optional, Union

from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import ControlRecord, PipelineContext, RecordEnvelope
from datahub.ingestion.api.transform import Transformer
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.metadata.com.linkedin.pegasus2avro.mxe import (
MetadataChangeEvent,
MetadataChangeProposal,
)


class AutoHelperTransformer(Transformer):
"""Converts an auto_* source helper into a transformer.
Important usage note: this assumes that the auto helper is stateless. The converter
will be called multiple times, once for each batch of records. If the helper
attempts to maintain state or perform some cleanup at the end of the stream, it
will not behave correctly.
"""

def __init__(
self,
converter: Callable[[Iterable[MetadataWorkUnit]], Iterable[MetadataWorkUnit]],
):
self.converter = converter

def transform(
self, record_envelopes: Iterable[RecordEnvelope]
) -> Iterable[RecordEnvelope]:
records = list(record_envelopes)

normal_records = [r for r in records if not isinstance(r.record, ControlRecord)]
control_records = [r for r in records if isinstance(r.record, ControlRecord)]

yield from self._from_workunits(
self.converter(
self._into_workunits(normal_records),
)
)

# Pass through control records as-is. Note that this isn't fully correct, since it technically
# reorders the control records relative to the normal records. This is ok since the only control
# record we have marks the end of the stream.
yield from control_records

@classmethod
def _into_workunits(
cls,
stream: Iterable[
RecordEnvelope[
Union[
MetadataChangeEvent,
MetadataChangeProposal,
MetadataChangeProposalWrapper,
]
]
],
) -> Iterable[MetadataWorkUnit]:
for record in stream:
workunit_id: Optional[str] = record.metadata.get("workunit_id")
metadata = record.record
yield MetadataWorkUnit.from_metadata(metadata, id=workunit_id)

@classmethod
def _from_workunits(
cls, stream: Iterable[MetadataWorkUnit]
) -> Iterable[RecordEnvelope]:
for workunit in stream:
yield RecordEnvelope(
workunit.metadata,
{
"workunit_id": workunit.id,
},
)

@classmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> Transformer:
raise NotImplementedError(f"{cls.__name__} cannot be created from config")
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import functools
from typing import Iterable

from datahub.emitter.mce_builder import get_sys_time
from datahub.ingestion.api.common import PipelineContext, RecordEnvelope
from datahub.ingestion.api.transform import Transformer
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.transformer.auto_helper_transformer import AutoHelperTransformer
from datahub.metadata.schema_classes import SystemMetadataClass


def auto_system_metadata(
ctx: PipelineContext,
stream: Iterable[MetadataWorkUnit],
) -> Iterable[MetadataWorkUnit]:
if not ctx.pipeline_config:
raise ValueError("Pipeline config is required for system metadata")
set_system_metadata = ctx.pipeline_config.flags.set_system_metadata
set_pipeline_name = ctx.pipeline_config.flags.set_system_metadata_pipeline_name

for workunit in stream:
if set_system_metadata:
workunit.metadata.systemMetadata = SystemMetadataClass(
lastObserved=get_sys_time(), runId=ctx.run_id
)
if set_pipeline_name:
workunit.metadata.systemMetadata.pipelineName = ctx.pipeline_name

yield workunit


class SystemMetadataTransformer(Transformer):
def __init__(self, ctx: PipelineContext):
self._inner_transfomer = AutoHelperTransformer(
functools.partial(auto_system_metadata, ctx)
)

def transform(
self, record_envelopes: Iterable[RecordEnvelope]
) -> Iterable[RecordEnvelope]:
yield from self._inner_transfomer.transform(record_envelopes)

@classmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> Transformer:
raise NotImplementedError(f"{cls.__name__} cannot be created from config")
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ services:
- AWS_REGION=us-east-1
ports:
- 8888:8888
- 8080:8080
- 28080:8080
- 10000:10000
- 10001:10001
rest:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"customProperties": {
"owner": "root",
"created-at": "2024-05-22T14:09:15.234903700Z",
"created-at": "2024-06-27T17:29:32.492204247Z",
"write.format.default": "parquet",
"location": "s3a://warehouse/wh/nyc/another_taxis",
"format-version": "1",
"partition-spec": "[{\"name\": \"trip_date\", \"transform\": \"identity\", \"source\": \"trip_date\", \"source-id\": 2, \"source-type\": \"timestamptz\", \"field-id\": 1000}]",
"snapshot-id": "1706020810864905360",
"manifest-list": "s3a://warehouse/wh/nyc/another_taxis/metadata/snap-1706020810864905360-1-90ad8346-ac1b-4e73-bb30-dfd9b0b0e0dc.avro"
"snapshot-id": "1131595459662979239",
"manifest-list": "s3a://warehouse/wh/nyc/another_taxis/metadata/snap-1131595459662979239-1-0e80739b-774c-4eda-9d96-3a4c70873c32.avro"
},
"tags": []
}
Expand Down Expand Up @@ -150,7 +150,8 @@
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "iceberg-2020_04_14-07_00_00",
"lastRunId": "no-run-id-provided"
"lastRunId": "no-run-id-provided",
"pipelineName": "test_pipeline"
}
},
{
Expand All @@ -167,7 +168,8 @@
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "iceberg-2020_04_14-07_00_00",
"lastRunId": "no-run-id-provided"
"lastRunId": "no-run-id-provided",
"pipelineName": "test_pipeline"
}
},
{
Expand All @@ -183,7 +185,8 @@
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "iceberg-2020_04_14-07_00_00",
"lastRunId": "no-run-id-provided"
"lastRunId": "no-run-id-provided",
"pipelineName": "test_pipeline"
}
}
]
Loading

0 comments on commit f4be88d

Please sign in to comment.