diff --git a/metadata-ingestion/src/datahub/api/circuit_breaker/assertion_circuit_breaker.py b/metadata-ingestion/src/datahub/api/circuit_breaker/assertion_circuit_breaker.py index 9d2a65663ba37d..283cdaa8333338 100644 --- a/metadata-ingestion/src/datahub/api/circuit_breaker/assertion_circuit_breaker.py +++ b/metadata-ingestion/src/datahub/api/circuit_breaker/assertion_circuit_breaker.py @@ -1,6 +1,6 @@ import logging from dataclasses import dataclass -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from typing import Any, Dict, List, Optional from pydantic import Field @@ -10,6 +10,7 @@ CircuitBreakerConfig, ) from datahub.api.graphql import Assertion, Operation +from datahub.emitter.mce_builder import parse_ts_millis logger: logging.Logger = logging.getLogger(__name__) @@ -49,7 +50,7 @@ def get_last_updated(self, urn: str) -> Optional[datetime]: if not operations: return None else: - return datetime.fromtimestamp(operations[0]["lastUpdatedTimestamp"] / 1000) + return parse_ts_millis(operations[0]["lastUpdatedTimestamp"]) def _check_if_assertion_failed( self, assertions: List[Dict[str, Any]], last_updated: Optional[datetime] = None @@ -93,7 +94,7 @@ class AssertionResult: logger.info(f"Found successful assertion: {assertion_urn}") result = False if last_updated is not None: - last_run = datetime.fromtimestamp(last_assertion.time / 1000) + last_run = parse_ts_millis(last_assertion.time) if last_updated > last_run: logger.error( f"Missing assertion run for {assertion_urn}. The dataset was updated on {last_updated} but the last assertion run was at {last_run}" @@ -117,7 +118,7 @@ def is_circuit_breaker_active(self, urn: str) -> bool: ) if not last_updated: - last_updated = datetime.now() - self.config.time_delta + last_updated = datetime.now(tz=timezone.utc) - self.config.time_delta logger.info( f"Dataset {urn} doesn't have last updated or check_last_assertion_time is false, using calculated min assertion date {last_updated}" ) diff --git a/metadata-ingestion/src/datahub/configuration/datetimes.py b/metadata-ingestion/src/datahub/configuration/datetimes.py index 1520462fa9bf8c..be3834dcfab243 100644 --- a/metadata-ingestion/src/datahub/configuration/datetimes.py +++ b/metadata-ingestion/src/datahub/configuration/datetimes.py @@ -7,6 +7,8 @@ import dateutil.parser import humanfriendly +from datahub.emitter.mce_builder import parse_ts_millis + logger = logging.getLogger(__name__) @@ -39,7 +41,7 @@ def parse_user_datetime(input: str) -> datetime: return datetime.fromtimestamp(ts, tz=timezone.utc) except (OverflowError, ValueError): # This is likely a timestamp in milliseconds. - return datetime.fromtimestamp(ts / 1000, tz=timezone.utc) + return parse_ts_millis(ts) # Then try parsing as a relative time. with contextlib.suppress(humanfriendly.InvalidTimespan): diff --git a/metadata-ingestion/src/datahub/emitter/mce_builder.py b/metadata-ingestion/src/datahub/emitter/mce_builder.py index f7b54d2811f7c7..110624aa61cb89 100644 --- a/metadata-ingestion/src/datahub/emitter/mce_builder.py +++ b/metadata-ingestion/src/datahub/emitter/mce_builder.py @@ -104,7 +104,7 @@ def make_ts_millis(ts: Optional[datetime]) -> Optional[int]: @overload -def parse_ts_millis(ts: int) -> datetime: +def parse_ts_millis(ts: float) -> datetime: ... @@ -113,7 +113,7 @@ def parse_ts_millis(ts: None) -> None: ... -def parse_ts_millis(ts: Optional[int]) -> Optional[datetime]: +def parse_ts_millis(ts: Optional[float]) -> Optional[datetime]: if ts is None: return None return datetime.fromtimestamp(ts / 1000, tz=timezone.utc) diff --git a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py index 7791ea2797be34..3ed3c9e9d79a69 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py @@ -1,5 +1,4 @@ import logging -from datetime import datetime, timezone from typing import ( TYPE_CHECKING, Dict, @@ -14,7 +13,7 @@ ) from datahub.configuration.time_window_config import BaseTimeWindowConfig -from datahub.emitter.mce_builder import make_dataplatform_instance_urn +from datahub.emitter.mce_builder import make_dataplatform_instance_urn, parse_ts_millis from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.mcp_builder import entity_supports_aspect from datahub.ingestion.api.workunit import MetadataWorkUnit @@ -480,7 +479,7 @@ def auto_empty_dataset_usage_statistics( logger.warning( f"Usage statistics with unexpected timestamps, bucket_duration={config.bucket_duration}:\n" ", ".join( - str(datetime.fromtimestamp(ts / 1000, tz=timezone.utc)) + str(parse_ts_millis(ts)) for ts in invalid_timestamps ) ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py index 3ce34be8dc89df..cbe1f6eb978247 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py @@ -1,7 +1,7 @@ import logging from collections import defaultdict from dataclasses import dataclass, field -from datetime import datetime, timezone +from datetime import datetime from functools import lru_cache from typing import Any, Dict, FrozenSet, Iterable, Iterator, List, Optional @@ -15,6 +15,7 @@ TimePartitioningType, ) +from datahub.emitter.mce_builder import parse_ts_millis from datahub.ingestion.api.source import SourceReport from datahub.ingestion.source.bigquery_v2.bigquery_audit import BigqueryTableIdentifier from datahub.ingestion.source.bigquery_v2.bigquery_helper import parse_labels @@ -393,13 +394,7 @@ def _make_bigquery_table( name=table.table_name, created=table.created, table_type=table.table_type, - last_altered=( - datetime.fromtimestamp( - table.get("last_altered") / 1000, tz=timezone.utc - ) - if table.get("last_altered") is not None - else None - ), + last_altered=parse_ts_millis(table.get("last_altered")), size_in_bytes=table.get("bytes"), rows_count=table.get("row_count"), comment=table.comment, @@ -460,11 +455,7 @@ def _make_bigquery_view(view: bigquery.Row) -> BigqueryView: return BigqueryView( name=view.table_name, created=view.created, - last_altered=( - datetime.fromtimestamp(view.get("last_altered") / 1000, tz=timezone.utc) - if view.get("last_altered") is not None - else None - ), + last_altered=(parse_ts_millis(view.get("last_altered"))), comment=view.comment, view_definition=view.view_definition, materialized=view.table_type == BigqueryTableType.MATERIALIZED_VIEW, @@ -705,13 +696,7 @@ def _make_bigquery_table_snapshot(snapshot: bigquery.Row) -> BigqueryTableSnapsh return BigqueryTableSnapshot( name=snapshot.table_name, created=snapshot.created, - last_altered=( - datetime.fromtimestamp( - snapshot.get("last_altered") / 1000, tz=timezone.utc - ) - if snapshot.get("last_altered") is not None - else None - ), + last_altered=parse_ts_millis(snapshot.get("last_altered")), comment=snapshot.comment, ddl=snapshot.ddl, snapshot_time=snapshot.snapshot_time, diff --git a/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_kafka_reader.py b/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_kafka_reader.py index 56a3d55abb184f..ba073533eccfb5 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_kafka_reader.py +++ b/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_kafka_reader.py @@ -12,6 +12,7 @@ from confluent_kafka.schema_registry.avro import AvroDeserializer from datahub.configuration.kafka import KafkaConsumerConnectionConfig +from datahub.emitter.mce_builder import parse_ts_millis from datahub.ingestion.api.closeable import Closeable from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.source.datahub.config import DataHubSourceConfig @@ -92,7 +93,7 @@ def _poll_partition( if mcl.created and mcl.created.time > stop_time.timestamp() * 1000: logger.info( f"Stopped reading from kafka, reached MCL " - f"with audit stamp {datetime.fromtimestamp(mcl.created.time / 1000)}" + f"with audit stamp {parse_ts_millis(mcl.created.time)}" ) break diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py index bd6c23cc2d4644..c91be9b494c006 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py @@ -7,7 +7,10 @@ from sqlalchemy import create_engine, inspect from sqlalchemy.engine.reflection import Inspector -from datahub.emitter.mce_builder import make_dataset_urn_with_platform_instance +from datahub.emitter.mce_builder import ( + make_dataset_urn_with_platform_instance, + parse_ts_millis, +) from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.source.ge_data_profiler import ( @@ -245,11 +248,7 @@ def is_dataset_eligible_for_profiling( # If profiling state exists we have to carry over to the new state self.state_handler.add_to_state(dataset_urn, last_profiled) - threshold_time: Optional[datetime] = ( - datetime.fromtimestamp(last_profiled / 1000, timezone.utc) - if last_profiled - else None - ) + threshold_time: Optional[datetime] = parse_ts_millis(last_profiled) if ( not threshold_time and self.config.profiling.profile_if_updated_since_days is not None diff --git a/metadata-ingestion/src/datahub/ingestion/source/state/checkpoint.py b/metadata-ingestion/src/datahub/ingestion/source/state/checkpoint.py index 5bfd48eb754d53..2c7a4a8b6c137d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/state/checkpoint.py +++ b/metadata-ingestion/src/datahub/ingestion/source/state/checkpoint.py @@ -12,6 +12,7 @@ import pydantic from datahub.configuration.common import ConfigModel +from datahub.emitter.mce_builder import parse_ts_millis from datahub.metadata.schema_classes import ( DatahubIngestionCheckpointClass, IngestionCheckpointStateClass, @@ -144,7 +145,7 @@ def create_from_checkpoint_aspect( ) logger.info( f"Successfully constructed last checkpoint state for job {job_name} " - f"with timestamp {datetime.fromtimestamp(checkpoint_aspect.timestampMillis/1000, tz=timezone.utc)}" + f"with timestamp {parse_ts_millis(checkpoint_aspect.timestampMillis)}" ) return checkpoint return None diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/proxy.py b/metadata-ingestion/src/datahub/ingestion/source/unity/proxy.py index 11827bace4b5a1..9b96953794dcd5 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/proxy.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/proxy.py @@ -4,7 +4,7 @@ import dataclasses import logging -from datetime import datetime, timezone +from datetime import datetime from typing import Any, Dict, Iterable, List, Optional, Union, cast from unittest.mock import patch @@ -27,6 +27,7 @@ from databricks.sdk.service.workspace import ObjectType import datahub +from datahub.emitter.mce_builder import parse_ts_millis from datahub.ingestion.source.unity.hive_metastore_proxy import HiveMetastoreProxy from datahub.ingestion.source.unity.proxy_profiling import ( UnityCatalogProxyProfilingMixin, @@ -211,16 +212,8 @@ def workspace_notebooks(self) -> Iterable[Notebook]: id=obj.object_id, path=obj.path, language=obj.language, - created_at=( - datetime.fromtimestamp(obj.created_at / 1000, tz=timezone.utc) - if obj.created_at - else None - ), - modified_at=( - datetime.fromtimestamp(obj.modified_at / 1000, tz=timezone.utc) - if obj.modified_at - else None - ), + created_at=parse_ts_millis(obj.created_at), + modified_at=parse_ts_millis(obj.modified_at), ) def query_history( @@ -452,17 +445,9 @@ def _create_table( properties=obj.properties or {}, owner=obj.owner, generation=obj.generation, - created_at=( - datetime.fromtimestamp(obj.created_at / 1000, tz=timezone.utc) - if obj.created_at - else None - ), + created_at=(parse_ts_millis(obj.created_at) if obj.created_at else None), created_by=obj.created_by, - updated_at=( - datetime.fromtimestamp(obj.updated_at / 1000, tz=timezone.utc) - if obj.updated_at - else None - ), + updated_at=(parse_ts_millis(obj.updated_at) if obj.updated_at else None), updated_by=obj.updated_by, table_id=obj.table_id, comment=obj.comment, @@ -500,12 +485,8 @@ def _create_query(self, info: QueryInfo) -> Optional[Query]: query_id=info.query_id, query_text=info.query_text, statement_type=info.statement_type, - start_time=datetime.fromtimestamp( - info.query_start_time_ms / 1000, tz=timezone.utc - ), - end_time=datetime.fromtimestamp( - info.query_end_time_ms / 1000, tz=timezone.utc - ), + start_time=parse_ts_millis(info.query_start_time_ms), + end_time=parse_ts_millis(info.query_end_time_ms), user_id=info.user_id, user_name=info.user_name, executed_as_user_id=info.executed_as_user_id, diff --git a/metadata-ingestion/src/datahub/utilities/time.py b/metadata-ingestion/src/datahub/utilities/time.py index 0df7afb19935f7..e8338ce068c844 100644 --- a/metadata-ingestion/src/datahub/utilities/time.py +++ b/metadata-ingestion/src/datahub/utilities/time.py @@ -1,6 +1,8 @@ import time from dataclasses import dataclass -from datetime import datetime, timezone +from datetime import datetime + +from datahub.emitter.mce_builder import make_ts_millis, parse_ts_millis def get_current_time_in_seconds() -> int: @@ -9,12 +11,15 @@ def get_current_time_in_seconds() -> int: def ts_millis_to_datetime(ts_millis: int) -> datetime: """Converts input timestamp in milliseconds to a datetime object with UTC timezone""" - return datetime.fromtimestamp(ts_millis / 1000, tz=timezone.utc) + return parse_ts_millis(ts_millis) def datetime_to_ts_millis(dt: datetime) -> int: """Converts a datetime object to timestamp in milliseconds""" - return int(round(dt.timestamp() * 1000)) + # TODO: Deprecate these helpers in favor of make_ts_millis and parse_ts_millis. + # The other ones support None with a typing overload. + # Also possibly move those helpers to this file. + return make_ts_millis(dt) @dataclass