From d0423547ba559c6059ffc35f9ed153036bf0e45d Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Fri, 27 Dec 2024 13:50:28 -0500 Subject: [PATCH] feat(ingest): add parse_ts_millis helper (#12231) --- .../assertion_circuit_breaker.py | 9 ++--- .../src/datahub/emitter/mce_builder.py | 18 +++++++++- .../src/datahub/emitter/mcp_builder.py | 9 ++--- .../src/datahub/emitter/mcp_patch_builder.py | 4 +-- .../src/datahub/emitter/rest_emitter.py | 4 +-- .../datahub/ingestion/api/source_helpers.py | 8 ++--- .../source/bigquery_v2/bigquery_schema.py | 25 +++---------- .../source/datahub/datahub_kafka_reader.py | 3 +- .../source/sql/sql_generic_profiler.py | 11 +++--- .../ingestion/source/state/checkpoint.py | 3 +- .../datahub/ingestion/source/unity/proxy.py | 35 +++++-------------- .../src/datahub/utilities/time.py | 11 ++++-- .../dbt_enabled_with_schemas_mces_golden.json | 10 +++--- .../dbt_test_column_meta_mapping_golden.json | 10 +++--- ...test_prefer_sql_parser_lineage_golden.json | 34 +++++++++--------- ...bt_test_test_model_performance_golden.json | 34 +++++++++--------- ...th_complex_owner_patterns_mces_golden.json | 10 +++--- ...th_data_platform_instance_mces_golden.json | 10 +++--- ...h_non_incremental_lineage_mces_golden.json | 10 +++--- ..._target_platform_instance_mces_golden.json | 10 +++--- .../tests/unit/sdk/test_mce_builder.py | 17 +++++++++ .../tests/unit/serde/test_codegen.py | 6 ++-- smoke-test/smoke.sh | 2 ++ 23 files changed, 145 insertions(+), 148 deletions(-) diff --git a/metadata-ingestion/src/datahub/api/circuit_breaker/assertion_circuit_breaker.py b/metadata-ingestion/src/datahub/api/circuit_breaker/assertion_circuit_breaker.py index 9d2a65663ba37..283cdaa833333 100644 --- a/metadata-ingestion/src/datahub/api/circuit_breaker/assertion_circuit_breaker.py +++ b/metadata-ingestion/src/datahub/api/circuit_breaker/assertion_circuit_breaker.py @@ -1,6 +1,6 @@ import logging from dataclasses import dataclass -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from typing import Any, Dict, List, Optional from pydantic import Field @@ -10,6 +10,7 @@ CircuitBreakerConfig, ) from datahub.api.graphql import Assertion, Operation +from datahub.emitter.mce_builder import parse_ts_millis logger: logging.Logger = logging.getLogger(__name__) @@ -49,7 +50,7 @@ def get_last_updated(self, urn: str) -> Optional[datetime]: if not operations: return None else: - return datetime.fromtimestamp(operations[0]["lastUpdatedTimestamp"] / 1000) + return parse_ts_millis(operations[0]["lastUpdatedTimestamp"]) def _check_if_assertion_failed( self, assertions: List[Dict[str, Any]], last_updated: Optional[datetime] = None @@ -93,7 +94,7 @@ class AssertionResult: logger.info(f"Found successful assertion: {assertion_urn}") result = False if last_updated is not None: - last_run = datetime.fromtimestamp(last_assertion.time / 1000) + last_run = parse_ts_millis(last_assertion.time) if last_updated > last_run: logger.error( f"Missing assertion run for {assertion_urn}. The dataset was updated on {last_updated} but the last assertion run was at {last_run}" @@ -117,7 +118,7 @@ def is_circuit_breaker_active(self, urn: str) -> bool: ) if not last_updated: - last_updated = datetime.now() - self.config.time_delta + last_updated = datetime.now(tz=timezone.utc) - self.config.time_delta logger.info( f"Dataset {urn} doesn't have last updated or check_last_assertion_time is false, using calculated min assertion date {last_updated}" ) diff --git a/metadata-ingestion/src/datahub/emitter/mce_builder.py b/metadata-ingestion/src/datahub/emitter/mce_builder.py index 69946c575908b..110624aa61cb8 100644 --- a/metadata-ingestion/src/datahub/emitter/mce_builder.py +++ b/metadata-ingestion/src/datahub/emitter/mce_builder.py @@ -6,7 +6,7 @@ import os import re import time -from datetime import datetime +from datetime import datetime, timezone from enum import Enum from typing import ( TYPE_CHECKING, @@ -103,6 +103,22 @@ def make_ts_millis(ts: Optional[datetime]) -> Optional[int]: return int(ts.timestamp() * 1000) +@overload +def parse_ts_millis(ts: float) -> datetime: + ... + + +@overload +def parse_ts_millis(ts: None) -> None: + ... + + +def parse_ts_millis(ts: Optional[float]) -> Optional[datetime]: + if ts is None: + return None + return datetime.fromtimestamp(ts / 1000, tz=timezone.utc) + + def make_data_platform_urn(platform: str) -> str: if platform.startswith("urn:li:dataPlatform:"): return platform diff --git a/metadata-ingestion/src/datahub/emitter/mcp_builder.py b/metadata-ingestion/src/datahub/emitter/mcp_builder.py index 293157f8a1ed0..c8eb62a2e1de2 100644 --- a/metadata-ingestion/src/datahub/emitter/mcp_builder.py +++ b/metadata-ingestion/src/datahub/emitter/mcp_builder.py @@ -4,8 +4,8 @@ from pydantic.main import BaseModel from datahub.cli.env_utils import get_boolean_env_variable -from datahub.emitter.enum_helpers import get_enum_options from datahub.emitter.mce_builder import ( + ALL_ENV_TYPES, Aspect, datahub_guid, make_container_urn, @@ -25,7 +25,6 @@ ContainerClass, DomainsClass, EmbedClass, - FabricTypeClass, GlobalTagsClass, MetadataChangeEventClass, OwnerClass, @@ -206,11 +205,7 @@ def gen_containers( # Extra validation on the env field. # In certain cases (mainly for backwards compatibility), the env field will actually # have a platform instance name. - env = ( - container_key.env - if container_key.env in get_enum_options(FabricTypeClass) - else None - ) + env = container_key.env if container_key.env in ALL_ENV_TYPES else None container_urn = container_key.as_urn() diff --git a/metadata-ingestion/src/datahub/emitter/mcp_patch_builder.py b/metadata-ingestion/src/datahub/emitter/mcp_patch_builder.py index 779b42e1e1ee9..1ed8ce1d5a615 100644 --- a/metadata-ingestion/src/datahub/emitter/mcp_patch_builder.py +++ b/metadata-ingestion/src/datahub/emitter/mcp_patch_builder.py @@ -2,7 +2,7 @@ import time from collections import defaultdict from dataclasses import dataclass -from typing import Any, Dict, Iterable, List, Optional, Sequence, Union +from typing import Any, Dict, List, Optional, Sequence, Union from datahub.emitter.aspect import JSON_PATCH_CONTENT_TYPE from datahub.emitter.serialization_helper import pre_json_transform @@ -75,7 +75,7 @@ def _add_patch( # TODO: Validate that aspectName is a valid aspect for this entityType self.patches[aspect_name].append(_Patch(op, path, value)) - def build(self) -> Iterable[MetadataChangeProposalClass]: + def build(self) -> List[MetadataChangeProposalClass]: return [ MetadataChangeProposalClass( entityUrn=self.urn, diff --git a/metadata-ingestion/src/datahub/emitter/rest_emitter.py b/metadata-ingestion/src/datahub/emitter/rest_emitter.py index 675717b5ec482..04242c8bf45d2 100644 --- a/metadata-ingestion/src/datahub/emitter/rest_emitter.py +++ b/metadata-ingestion/src/datahub/emitter/rest_emitter.py @@ -3,7 +3,7 @@ import logging import os from json.decoder import JSONDecodeError -from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Union +from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Sequence, Union import requests from deprecated import deprecated @@ -288,7 +288,7 @@ def emit_mcp( def emit_mcps( self, - mcps: List[Union[MetadataChangeProposal, MetadataChangeProposalWrapper]], + mcps: Sequence[Union[MetadataChangeProposal, MetadataChangeProposalWrapper]], async_flag: Optional[bool] = None, ) -> int: logger.debug("Attempting to emit batch mcps") diff --git a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py index 7791ea2797be3..f3e5b1db6a1c8 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py @@ -1,5 +1,4 @@ import logging -from datetime import datetime, timezone from typing import ( TYPE_CHECKING, Dict, @@ -14,7 +13,7 @@ ) from datahub.configuration.time_window_config import BaseTimeWindowConfig -from datahub.emitter.mce_builder import make_dataplatform_instance_urn +from datahub.emitter.mce_builder import make_dataplatform_instance_urn, parse_ts_millis from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.mcp_builder import entity_supports_aspect from datahub.ingestion.api.workunit import MetadataWorkUnit @@ -479,10 +478,7 @@ def auto_empty_dataset_usage_statistics( if invalid_timestamps: logger.warning( f"Usage statistics with unexpected timestamps, bucket_duration={config.bucket_duration}:\n" - ", ".join( - str(datetime.fromtimestamp(ts / 1000, tz=timezone.utc)) - for ts in invalid_timestamps - ) + ", ".join(str(parse_ts_millis(ts)) for ts in invalid_timestamps) ) for bucket in bucket_timestamps: diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py index 3ce34be8dc89d..cbe1f6eb97824 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py @@ -1,7 +1,7 @@ import logging from collections import defaultdict from dataclasses import dataclass, field -from datetime import datetime, timezone +from datetime import datetime from functools import lru_cache from typing import Any, Dict, FrozenSet, Iterable, Iterator, List, Optional @@ -15,6 +15,7 @@ TimePartitioningType, ) +from datahub.emitter.mce_builder import parse_ts_millis from datahub.ingestion.api.source import SourceReport from datahub.ingestion.source.bigquery_v2.bigquery_audit import BigqueryTableIdentifier from datahub.ingestion.source.bigquery_v2.bigquery_helper import parse_labels @@ -393,13 +394,7 @@ def _make_bigquery_table( name=table.table_name, created=table.created, table_type=table.table_type, - last_altered=( - datetime.fromtimestamp( - table.get("last_altered") / 1000, tz=timezone.utc - ) - if table.get("last_altered") is not None - else None - ), + last_altered=parse_ts_millis(table.get("last_altered")), size_in_bytes=table.get("bytes"), rows_count=table.get("row_count"), comment=table.comment, @@ -460,11 +455,7 @@ def _make_bigquery_view(view: bigquery.Row) -> BigqueryView: return BigqueryView( name=view.table_name, created=view.created, - last_altered=( - datetime.fromtimestamp(view.get("last_altered") / 1000, tz=timezone.utc) - if view.get("last_altered") is not None - else None - ), + last_altered=(parse_ts_millis(view.get("last_altered"))), comment=view.comment, view_definition=view.view_definition, materialized=view.table_type == BigqueryTableType.MATERIALIZED_VIEW, @@ -705,13 +696,7 @@ def _make_bigquery_table_snapshot(snapshot: bigquery.Row) -> BigqueryTableSnapsh return BigqueryTableSnapshot( name=snapshot.table_name, created=snapshot.created, - last_altered=( - datetime.fromtimestamp( - snapshot.get("last_altered") / 1000, tz=timezone.utc - ) - if snapshot.get("last_altered") is not None - else None - ), + last_altered=parse_ts_millis(snapshot.get("last_altered")), comment=snapshot.comment, ddl=snapshot.ddl, snapshot_time=snapshot.snapshot_time, diff --git a/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_kafka_reader.py b/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_kafka_reader.py index 56a3d55abb184..ba073533eccfb 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_kafka_reader.py +++ b/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_kafka_reader.py @@ -12,6 +12,7 @@ from confluent_kafka.schema_registry.avro import AvroDeserializer from datahub.configuration.kafka import KafkaConsumerConnectionConfig +from datahub.emitter.mce_builder import parse_ts_millis from datahub.ingestion.api.closeable import Closeable from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.source.datahub.config import DataHubSourceConfig @@ -92,7 +93,7 @@ def _poll_partition( if mcl.created and mcl.created.time > stop_time.timestamp() * 1000: logger.info( f"Stopped reading from kafka, reached MCL " - f"with audit stamp {datetime.fromtimestamp(mcl.created.time / 1000)}" + f"with audit stamp {parse_ts_millis(mcl.created.time)}" ) break diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py index bd6c23cc2d464..c91be9b494c00 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py @@ -7,7 +7,10 @@ from sqlalchemy import create_engine, inspect from sqlalchemy.engine.reflection import Inspector -from datahub.emitter.mce_builder import make_dataset_urn_with_platform_instance +from datahub.emitter.mce_builder import ( + make_dataset_urn_with_platform_instance, + parse_ts_millis, +) from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.source.ge_data_profiler import ( @@ -245,11 +248,7 @@ def is_dataset_eligible_for_profiling( # If profiling state exists we have to carry over to the new state self.state_handler.add_to_state(dataset_urn, last_profiled) - threshold_time: Optional[datetime] = ( - datetime.fromtimestamp(last_profiled / 1000, timezone.utc) - if last_profiled - else None - ) + threshold_time: Optional[datetime] = parse_ts_millis(last_profiled) if ( not threshold_time and self.config.profiling.profile_if_updated_since_days is not None diff --git a/metadata-ingestion/src/datahub/ingestion/source/state/checkpoint.py b/metadata-ingestion/src/datahub/ingestion/source/state/checkpoint.py index 5bfd48eb754d5..2c7a4a8b6c137 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/state/checkpoint.py +++ b/metadata-ingestion/src/datahub/ingestion/source/state/checkpoint.py @@ -12,6 +12,7 @@ import pydantic from datahub.configuration.common import ConfigModel +from datahub.emitter.mce_builder import parse_ts_millis from datahub.metadata.schema_classes import ( DatahubIngestionCheckpointClass, IngestionCheckpointStateClass, @@ -144,7 +145,7 @@ def create_from_checkpoint_aspect( ) logger.info( f"Successfully constructed last checkpoint state for job {job_name} " - f"with timestamp {datetime.fromtimestamp(checkpoint_aspect.timestampMillis/1000, tz=timezone.utc)}" + f"with timestamp {parse_ts_millis(checkpoint_aspect.timestampMillis)}" ) return checkpoint return None diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/proxy.py b/metadata-ingestion/src/datahub/ingestion/source/unity/proxy.py index 11827bace4b5a..9b96953794dcd 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/proxy.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/proxy.py @@ -4,7 +4,7 @@ import dataclasses import logging -from datetime import datetime, timezone +from datetime import datetime from typing import Any, Dict, Iterable, List, Optional, Union, cast from unittest.mock import patch @@ -27,6 +27,7 @@ from databricks.sdk.service.workspace import ObjectType import datahub +from datahub.emitter.mce_builder import parse_ts_millis from datahub.ingestion.source.unity.hive_metastore_proxy import HiveMetastoreProxy from datahub.ingestion.source.unity.proxy_profiling import ( UnityCatalogProxyProfilingMixin, @@ -211,16 +212,8 @@ def workspace_notebooks(self) -> Iterable[Notebook]: id=obj.object_id, path=obj.path, language=obj.language, - created_at=( - datetime.fromtimestamp(obj.created_at / 1000, tz=timezone.utc) - if obj.created_at - else None - ), - modified_at=( - datetime.fromtimestamp(obj.modified_at / 1000, tz=timezone.utc) - if obj.modified_at - else None - ), + created_at=parse_ts_millis(obj.created_at), + modified_at=parse_ts_millis(obj.modified_at), ) def query_history( @@ -452,17 +445,9 @@ def _create_table( properties=obj.properties or {}, owner=obj.owner, generation=obj.generation, - created_at=( - datetime.fromtimestamp(obj.created_at / 1000, tz=timezone.utc) - if obj.created_at - else None - ), + created_at=(parse_ts_millis(obj.created_at) if obj.created_at else None), created_by=obj.created_by, - updated_at=( - datetime.fromtimestamp(obj.updated_at / 1000, tz=timezone.utc) - if obj.updated_at - else None - ), + updated_at=(parse_ts_millis(obj.updated_at) if obj.updated_at else None), updated_by=obj.updated_by, table_id=obj.table_id, comment=obj.comment, @@ -500,12 +485,8 @@ def _create_query(self, info: QueryInfo) -> Optional[Query]: query_id=info.query_id, query_text=info.query_text, statement_type=info.statement_type, - start_time=datetime.fromtimestamp( - info.query_start_time_ms / 1000, tz=timezone.utc - ), - end_time=datetime.fromtimestamp( - info.query_end_time_ms / 1000, tz=timezone.utc - ), + start_time=parse_ts_millis(info.query_start_time_ms), + end_time=parse_ts_millis(info.query_end_time_ms), user_id=info.user_id, user_name=info.user_name, executed_as_user_id=info.executed_as_user_id, diff --git a/metadata-ingestion/src/datahub/utilities/time.py b/metadata-ingestion/src/datahub/utilities/time.py index 0df7afb19935f..e8338ce068c84 100644 --- a/metadata-ingestion/src/datahub/utilities/time.py +++ b/metadata-ingestion/src/datahub/utilities/time.py @@ -1,6 +1,8 @@ import time from dataclasses import dataclass -from datetime import datetime, timezone +from datetime import datetime + +from datahub.emitter.mce_builder import make_ts_millis, parse_ts_millis def get_current_time_in_seconds() -> int: @@ -9,12 +11,15 @@ def get_current_time_in_seconds() -> int: def ts_millis_to_datetime(ts_millis: int) -> datetime: """Converts input timestamp in milliseconds to a datetime object with UTC timezone""" - return datetime.fromtimestamp(ts_millis / 1000, tz=timezone.utc) + return parse_ts_millis(ts_millis) def datetime_to_ts_millis(dt: datetime) -> int: """Converts a datetime object to timestamp in milliseconds""" - return int(round(dt.timestamp() * 1000)) + # TODO: Deprecate these helpers in favor of make_ts_millis and parse_ts_millis. + # The other ones support None with a typing overload. + # Also possibly move those helpers to this file. + return make_ts_millis(dt) @dataclass diff --git a/metadata-ingestion/tests/integration/dbt/dbt_enabled_with_schemas_mces_golden.json b/metadata-ingestion/tests/integration/dbt/dbt_enabled_with_schemas_mces_golden.json index dc8c400b29157..fb25531e68526 100644 --- a/metadata-ingestion/tests/integration/dbt/dbt_enabled_with_schemas_mces_golden.json +++ b/metadata-ingestion/tests/integration/dbt/dbt_enabled_with_schemas_mces_golden.json @@ -2658,7 +2658,7 @@ "actor": "urn:li:corpuser:unknown" }, "lastModified": { - "time": 1580505371997, + "time": 1580505371996, "actor": "urn:li:corpuser:dbt_executor" }, "hash": "", @@ -2930,7 +2930,7 @@ "actor": "urn:li:corpuser:unknown" }, "lastModified": { - "time": 1582319845997, + "time": 1582319845996, "actor": "urn:li:corpuser:dbt_executor" }, "hash": "", @@ -3180,7 +3180,7 @@ "actor": "urn:li:corpuser:unknown" }, "lastModified": { - "time": 1584998318997, + "time": 1584998318996, "actor": "urn:li:corpuser:dbt_executor" }, "hash": "", @@ -3430,7 +3430,7 @@ "actor": "urn:li:corpuser:unknown" }, "lastModified": { - "time": 1588287228997, + "time": 1588287228996, "actor": "urn:li:corpuser:dbt_executor" }, "hash": "", @@ -3680,7 +3680,7 @@ "actor": "urn:li:corpuser:unknown" }, "lastModified": { - "time": 1589460269997, + "time": 1589460269996, "actor": "urn:li:corpuser:dbt_executor" }, "hash": "", diff --git a/metadata-ingestion/tests/integration/dbt/dbt_test_column_meta_mapping_golden.json b/metadata-ingestion/tests/integration/dbt/dbt_test_column_meta_mapping_golden.json index 60f5bf4fbca9a..69c4b9cce0b17 100644 --- a/metadata-ingestion/tests/integration/dbt/dbt_test_column_meta_mapping_golden.json +++ b/metadata-ingestion/tests/integration/dbt/dbt_test_column_meta_mapping_golden.json @@ -3024,7 +3024,7 @@ "actor": "urn:li:corpuser:unknown" }, "lastModified": { - "time": 1580505371997, + "time": 1580505371996, "actor": "urn:li:corpuser:dbt_executor" }, "hash": "", @@ -3296,7 +3296,7 @@ "actor": "urn:li:corpuser:unknown" }, "lastModified": { - "time": 1582319845997, + "time": 1582319845996, "actor": "urn:li:corpuser:dbt_executor" }, "hash": "", @@ -3546,7 +3546,7 @@ "actor": "urn:li:corpuser:unknown" }, "lastModified": { - "time": 1584998318997, + "time": 1584998318996, "actor": "urn:li:corpuser:dbt_executor" }, "hash": "", @@ -3796,7 +3796,7 @@ "actor": "urn:li:corpuser:unknown" }, "lastModified": { - "time": 1588287228997, + "time": 1588287228996, "actor": "urn:li:corpuser:dbt_executor" }, "hash": "", @@ -4046,7 +4046,7 @@ "actor": "urn:li:corpuser:unknown" }, "lastModified": { - "time": 1589460269997, + "time": 1589460269996, "actor": "urn:li:corpuser:dbt_executor" }, "hash": "", diff --git a/metadata-ingestion/tests/integration/dbt/dbt_test_prefer_sql_parser_lineage_golden.json b/metadata-ingestion/tests/integration/dbt/dbt_test_prefer_sql_parser_lineage_golden.json index 42a416473ae24..0361e899b5b39 100644 --- a/metadata-ingestion/tests/integration/dbt/dbt_test_prefer_sql_parser_lineage_golden.json +++ b/metadata-ingestion/tests/integration/dbt/dbt_test_prefer_sql_parser_lineage_golden.json @@ -564,7 +564,7 @@ "name": "just-some-random-id_urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.an-aliased-view-for-monthly-billing,PROD)", "type": "BATCH_SCHEDULED", "created": { - "time": 1663355198240, + "time": 1663355198239, "actor": "urn:li:corpuser:datahub" } } @@ -636,7 +636,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1663355198240, + "timestampMillis": 1663355198239, "partitionSpec": { "partition": "FULL_TABLE_SNAPSHOT", "type": "FULL_TABLE" @@ -657,7 +657,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1663355198242, + "timestampMillis": 1663355198241, "partitionSpec": { "partition": "FULL_TABLE_SNAPSHOT", "type": "FULL_TABLE" @@ -1019,7 +1019,7 @@ "name": "just-some-random-id_urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.an_aliased_view_for_payments,PROD)", "type": "BATCH_SCHEDULED", "created": { - "time": 1663355198240, + "time": 1663355198239, "actor": "urn:li:corpuser:datahub" } } @@ -1095,7 +1095,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1663355198240, + "timestampMillis": 1663355198239, "partitionSpec": { "partition": "FULL_TABLE_SNAPSHOT", "type": "FULL_TABLE" @@ -1116,7 +1116,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1663355198242, + "timestampMillis": 1663355198241, "partitionSpec": { "partition": "FULL_TABLE_SNAPSHOT", "type": "FULL_TABLE" @@ -1347,7 +1347,7 @@ "name": "just-some-random-id_urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.payments_by_customer_by_month,PROD)", "type": "BATCH_SCHEDULED", "created": { - "time": 1663355198240, + "time": 1663355198239, "actor": "urn:li:corpuser:datahub" } } @@ -1418,7 +1418,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1663355198240, + "timestampMillis": 1663355198239, "partitionSpec": { "partition": "FULL_TABLE_SNAPSHOT", "type": "FULL_TABLE" @@ -1439,7 +1439,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1663355198242, + "timestampMillis": 1663355198241, "partitionSpec": { "partition": "FULL_TABLE_SNAPSHOT", "type": "FULL_TABLE" @@ -1871,7 +1871,7 @@ "name": "just-some-random-id_urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.customer_snapshot,PROD)", "type": "BATCH_SCHEDULED", "created": { - "time": 1663355198240, + "time": 1663355198239, "actor": "urn:li:corpuser:datahub" } } @@ -1942,7 +1942,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1663355198240, + "timestampMillis": 1663355198239, "partitionSpec": { "partition": "FULL_TABLE_SNAPSHOT", "type": "FULL_TABLE" @@ -1963,7 +1963,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1663355198242, + "timestampMillis": 1663355198241, "partitionSpec": { "partition": "FULL_TABLE_SNAPSHOT", "type": "FULL_TABLE" @@ -3140,7 +3140,7 @@ "actor": "urn:li:corpuser:unknown" }, "lastModified": { - "time": 1580505371997, + "time": 1580505371996, "actor": "urn:li:corpuser:dbt_executor" }, "hash": "", @@ -3341,7 +3341,7 @@ "actor": "urn:li:corpuser:unknown" }, "lastModified": { - "time": 1582319845997, + "time": 1582319845996, "actor": "urn:li:corpuser:dbt_executor" }, "hash": "", @@ -3523,7 +3523,7 @@ "actor": "urn:li:corpuser:unknown" }, "lastModified": { - "time": 1584998318997, + "time": 1584998318996, "actor": "urn:li:corpuser:dbt_executor" }, "hash": "", @@ -3705,7 +3705,7 @@ "actor": "urn:li:corpuser:unknown" }, "lastModified": { - "time": 1588287228997, + "time": 1588287228996, "actor": "urn:li:corpuser:dbt_executor" }, "hash": "", @@ -3887,7 +3887,7 @@ "actor": "urn:li:corpuser:unknown" }, "lastModified": { - "time": 1589460269997, + "time": 1589460269996, "actor": "urn:li:corpuser:dbt_executor" }, "hash": "", diff --git a/metadata-ingestion/tests/integration/dbt/dbt_test_test_model_performance_golden.json b/metadata-ingestion/tests/integration/dbt/dbt_test_test_model_performance_golden.json index c281ea3eed0fa..c59620f010343 100644 --- a/metadata-ingestion/tests/integration/dbt/dbt_test_test_model_performance_golden.json +++ b/metadata-ingestion/tests/integration/dbt/dbt_test_test_model_performance_golden.json @@ -564,7 +564,7 @@ "name": "just-some-random-id_urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.an-aliased-view-for-monthly-billing,PROD)", "type": "BATCH_SCHEDULED", "created": { - "time": 1663355198240, + "time": 1663355198239, "actor": "urn:li:corpuser:datahub" } } @@ -636,7 +636,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1663355198240, + "timestampMillis": 1663355198239, "partitionSpec": { "partition": "FULL_TABLE_SNAPSHOT", "type": "FULL_TABLE" @@ -657,7 +657,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1663355198242, + "timestampMillis": 1663355198241, "partitionSpec": { "partition": "FULL_TABLE_SNAPSHOT", "type": "FULL_TABLE" @@ -1019,7 +1019,7 @@ "name": "just-some-random-id_urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.an_aliased_view_for_payments,PROD)", "type": "BATCH_SCHEDULED", "created": { - "time": 1663355198240, + "time": 1663355198239, "actor": "urn:li:corpuser:datahub" } } @@ -1095,7 +1095,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1663355198240, + "timestampMillis": 1663355198239, "partitionSpec": { "partition": "FULL_TABLE_SNAPSHOT", "type": "FULL_TABLE" @@ -1116,7 +1116,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1663355198242, + "timestampMillis": 1663355198241, "partitionSpec": { "partition": "FULL_TABLE_SNAPSHOT", "type": "FULL_TABLE" @@ -1347,7 +1347,7 @@ "name": "just-some-random-id_urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.payments_by_customer_by_month,PROD)", "type": "BATCH_SCHEDULED", "created": { - "time": 1663355198240, + "time": 1663355198239, "actor": "urn:li:corpuser:datahub" } } @@ -1418,7 +1418,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1663355198240, + "timestampMillis": 1663355198239, "partitionSpec": { "partition": "FULL_TABLE_SNAPSHOT", "type": "FULL_TABLE" @@ -1439,7 +1439,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1663355198242, + "timestampMillis": 1663355198241, "partitionSpec": { "partition": "FULL_TABLE_SNAPSHOT", "type": "FULL_TABLE" @@ -1871,7 +1871,7 @@ "name": "just-some-random-id_urn:li:dataset:(urn:li:dataPlatform:dbt,pagila.public.customer_snapshot,PROD)", "type": "BATCH_SCHEDULED", "created": { - "time": 1663355198240, + "time": 1663355198239, "actor": "urn:li:corpuser:datahub" } } @@ -1942,7 +1942,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1663355198240, + "timestampMillis": 1663355198239, "partitionSpec": { "partition": "FULL_TABLE_SNAPSHOT", "type": "FULL_TABLE" @@ -1963,7 +1963,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1663355198242, + "timestampMillis": 1663355198241, "partitionSpec": { "partition": "FULL_TABLE_SNAPSHOT", "type": "FULL_TABLE" @@ -3504,7 +3504,7 @@ "actor": "urn:li:corpuser:unknown" }, "lastModified": { - "time": 1580505371997, + "time": 1580505371996, "actor": "urn:li:corpuser:dbt_executor" }, "hash": "", @@ -3773,7 +3773,7 @@ "actor": "urn:li:corpuser:unknown" }, "lastModified": { - "time": 1582319845997, + "time": 1582319845996, "actor": "urn:li:corpuser:dbt_executor" }, "hash": "", @@ -4023,7 +4023,7 @@ "actor": "urn:li:corpuser:unknown" }, "lastModified": { - "time": 1584998318997, + "time": 1584998318996, "actor": "urn:li:corpuser:dbt_executor" }, "hash": "", @@ -4273,7 +4273,7 @@ "actor": "urn:li:corpuser:unknown" }, "lastModified": { - "time": 1588287228997, + "time": 1588287228996, "actor": "urn:li:corpuser:dbt_executor" }, "hash": "", @@ -4523,7 +4523,7 @@ "actor": "urn:li:corpuser:unknown" }, "lastModified": { - "time": 1589460269997, + "time": 1589460269996, "actor": "urn:li:corpuser:dbt_executor" }, "hash": "", diff --git a/metadata-ingestion/tests/integration/dbt/dbt_test_with_complex_owner_patterns_mces_golden.json b/metadata-ingestion/tests/integration/dbt/dbt_test_with_complex_owner_patterns_mces_golden.json index 495fa32569f56..23b5525b712d0 100644 --- a/metadata-ingestion/tests/integration/dbt/dbt_test_with_complex_owner_patterns_mces_golden.json +++ b/metadata-ingestion/tests/integration/dbt/dbt_test_with_complex_owner_patterns_mces_golden.json @@ -2598,7 +2598,7 @@ "actor": "urn:li:corpuser:unknown" }, "lastModified": { - "time": 1580505371997, + "time": 1580505371996, "actor": "urn:li:corpuser:dbt_executor" }, "hash": "", @@ -2867,7 +2867,7 @@ "actor": "urn:li:corpuser:unknown" }, "lastModified": { - "time": 1582319845997, + "time": 1582319845996, "actor": "urn:li:corpuser:dbt_executor" }, "hash": "", @@ -3117,7 +3117,7 @@ "actor": "urn:li:corpuser:unknown" }, "lastModified": { - "time": 1584998318997, + "time": 1584998318996, "actor": "urn:li:corpuser:dbt_executor" }, "hash": "", @@ -3367,7 +3367,7 @@ "actor": "urn:li:corpuser:unknown" }, "lastModified": { - "time": 1588287228997, + "time": 1588287228996, "actor": "urn:li:corpuser:dbt_executor" }, "hash": "", @@ -3617,7 +3617,7 @@ "actor": "urn:li:corpuser:unknown" }, "lastModified": { - "time": 1589460269997, + "time": 1589460269996, "actor": "urn:li:corpuser:dbt_executor" }, "hash": "", diff --git a/metadata-ingestion/tests/integration/dbt/dbt_test_with_data_platform_instance_mces_golden.json b/metadata-ingestion/tests/integration/dbt/dbt_test_with_data_platform_instance_mces_golden.json index 20b7cf4a1c26c..da22458f5624c 100644 --- a/metadata-ingestion/tests/integration/dbt/dbt_test_with_data_platform_instance_mces_golden.json +++ b/metadata-ingestion/tests/integration/dbt/dbt_test_with_data_platform_instance_mces_golden.json @@ -2610,7 +2610,7 @@ "actor": "urn:li:corpuser:unknown" }, "lastModified": { - "time": 1580505371997, + "time": 1580505371996, "actor": "urn:li:corpuser:dbt_executor" }, "hash": "", @@ -2880,7 +2880,7 @@ "actor": "urn:li:corpuser:unknown" }, "lastModified": { - "time": 1582319845997, + "time": 1582319845996, "actor": "urn:li:corpuser:dbt_executor" }, "hash": "", @@ -3131,7 +3131,7 @@ "actor": "urn:li:corpuser:unknown" }, "lastModified": { - "time": 1584998318997, + "time": 1584998318996, "actor": "urn:li:corpuser:dbt_executor" }, "hash": "", @@ -3382,7 +3382,7 @@ "actor": "urn:li:corpuser:unknown" }, "lastModified": { - "time": 1588287228997, + "time": 1588287228996, "actor": "urn:li:corpuser:dbt_executor" }, "hash": "", @@ -3633,7 +3633,7 @@ "actor": "urn:li:corpuser:unknown" }, "lastModified": { - "time": 1589460269997, + "time": 1589460269996, "actor": "urn:li:corpuser:dbt_executor" }, "hash": "", diff --git a/metadata-ingestion/tests/integration/dbt/dbt_test_with_non_incremental_lineage_mces_golden.json b/metadata-ingestion/tests/integration/dbt/dbt_test_with_non_incremental_lineage_mces_golden.json index 80ca85a5e6c61..0b44fe77cd62a 100644 --- a/metadata-ingestion/tests/integration/dbt/dbt_test_with_non_incremental_lineage_mces_golden.json +++ b/metadata-ingestion/tests/integration/dbt/dbt_test_with_non_incremental_lineage_mces_golden.json @@ -2599,7 +2599,7 @@ "actor": "urn:li:corpuser:unknown" }, "lastModified": { - "time": 1580505371997, + "time": 1580505371996, "actor": "urn:li:corpuser:dbt_executor" }, "hash": "", @@ -2868,7 +2868,7 @@ "actor": "urn:li:corpuser:unknown" }, "lastModified": { - "time": 1582319845997, + "time": 1582319845996, "actor": "urn:li:corpuser:dbt_executor" }, "hash": "", @@ -3118,7 +3118,7 @@ "actor": "urn:li:corpuser:unknown" }, "lastModified": { - "time": 1584998318997, + "time": 1584998318996, "actor": "urn:li:corpuser:dbt_executor" }, "hash": "", @@ -3368,7 +3368,7 @@ "actor": "urn:li:corpuser:unknown" }, "lastModified": { - "time": 1588287228997, + "time": 1588287228996, "actor": "urn:li:corpuser:dbt_executor" }, "hash": "", @@ -3618,7 +3618,7 @@ "actor": "urn:li:corpuser:unknown" }, "lastModified": { - "time": 1589460269997, + "time": 1589460269996, "actor": "urn:li:corpuser:dbt_executor" }, "hash": "", diff --git a/metadata-ingestion/tests/integration/dbt/dbt_test_with_target_platform_instance_mces_golden.json b/metadata-ingestion/tests/integration/dbt/dbt_test_with_target_platform_instance_mces_golden.json index 1e6e4d8ba94a2..3174847dd7e7a 100644 --- a/metadata-ingestion/tests/integration/dbt/dbt_test_with_target_platform_instance_mces_golden.json +++ b/metadata-ingestion/tests/integration/dbt/dbt_test_with_target_platform_instance_mces_golden.json @@ -2599,7 +2599,7 @@ "actor": "urn:li:corpuser:unknown" }, "lastModified": { - "time": 1580505371997, + "time": 1580505371996, "actor": "urn:li:corpuser:dbt_executor" }, "hash": "", @@ -2868,7 +2868,7 @@ "actor": "urn:li:corpuser:unknown" }, "lastModified": { - "time": 1582319845997, + "time": 1582319845996, "actor": "urn:li:corpuser:dbt_executor" }, "hash": "", @@ -3118,7 +3118,7 @@ "actor": "urn:li:corpuser:unknown" }, "lastModified": { - "time": 1584998318997, + "time": 1584998318996, "actor": "urn:li:corpuser:dbt_executor" }, "hash": "", @@ -3368,7 +3368,7 @@ "actor": "urn:li:corpuser:unknown" }, "lastModified": { - "time": 1588287228997, + "time": 1588287228996, "actor": "urn:li:corpuser:dbt_executor" }, "hash": "", @@ -3618,7 +3618,7 @@ "actor": "urn:li:corpuser:unknown" }, "lastModified": { - "time": 1589460269997, + "time": 1589460269996, "actor": "urn:li:corpuser:dbt_executor" }, "hash": "", diff --git a/metadata-ingestion/tests/unit/sdk/test_mce_builder.py b/metadata-ingestion/tests/unit/sdk/test_mce_builder.py index d7c84f7863b40..3bdbf07bf28b7 100644 --- a/metadata-ingestion/tests/unit/sdk/test_mce_builder.py +++ b/metadata-ingestion/tests/unit/sdk/test_mce_builder.py @@ -1,3 +1,5 @@ +from datetime import datetime, timezone + import datahub.emitter.mce_builder as builder from datahub.metadata.schema_classes import ( DataFlowInfoClass, @@ -55,3 +57,18 @@ def test_make_group_urn() -> None: assert ( builder.make_group_urn("urn:li:corpuser:someUser") == "urn:li:corpuser:someUser" ) + + +def test_ts_millis() -> None: + assert builder.make_ts_millis(None) is None + assert builder.parse_ts_millis(None) is None + + assert ( + builder.make_ts_millis(datetime(2024, 1, 1, 2, 3, 4, 5, timezone.utc)) + == 1704074584000 + ) + + # We only have millisecond precision, don't support microseconds. + ts = datetime.now(timezone.utc).replace(microsecond=0) + ts_millis = builder.make_ts_millis(ts) + assert builder.parse_ts_millis(ts_millis) == ts diff --git a/metadata-ingestion/tests/unit/serde/test_codegen.py b/metadata-ingestion/tests/unit/serde/test_codegen.py index 98d62d5643ff2..b49f715312913 100644 --- a/metadata-ingestion/tests/unit/serde/test_codegen.py +++ b/metadata-ingestion/tests/unit/serde/test_codegen.py @@ -6,11 +6,10 @@ import pytest import typing_inspect -from datahub.emitter.enum_helpers import get_enum_options +from datahub.emitter.mce_builder import ALL_ENV_TYPES from datahub.metadata.schema_classes import ( ASPECT_CLASSES, KEY_ASPECTS, - FabricTypeClass, FineGrainedLineageClass, MetadataChangeEventClass, OwnershipClass, @@ -164,8 +163,7 @@ def _err(msg: str) -> None: def test_enum_options(): # This is mainly a sanity check to ensure that it doesn't do anything too crazy. - env_options = get_enum_options(FabricTypeClass) - assert "PROD" in env_options + assert "PROD" in ALL_ENV_TYPES def test_urn_types() -> None: diff --git a/smoke-test/smoke.sh b/smoke-test/smoke.sh index ec8188ebf5f4d..1d209b4ba8219 100755 --- a/smoke-test/smoke.sh +++ b/smoke-test/smoke.sh @@ -22,7 +22,9 @@ else echo "datahub:datahub" > ~/.datahub/plugins/frontend/auth/user.props python3 -m venv venv + set +x source venv/bin/activate + set -x python -m pip install --upgrade 'uv>=0.1.10' uv pip install -r requirements.txt fi