Skip to content

Commit

Permalink
start using parse_ts_milis everywhere
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 committed Dec 27, 2024
1 parent 113df7b commit 59403b0
Show file tree
Hide file tree
Showing 10 changed files with 42 additions and 68 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
from dataclasses import dataclass
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, List, Optional

from pydantic import Field
Expand All @@ -10,6 +10,7 @@
CircuitBreakerConfig,
)
from datahub.api.graphql import Assertion, Operation
from datahub.emitter.mce_builder import parse_ts_millis

logger: logging.Logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -49,7 +50,7 @@ def get_last_updated(self, urn: str) -> Optional[datetime]:
if not operations:
return None
else:
return datetime.fromtimestamp(operations[0]["lastUpdatedTimestamp"] / 1000)
return parse_ts_millis(operations[0]["lastUpdatedTimestamp"])

def _check_if_assertion_failed(
self, assertions: List[Dict[str, Any]], last_updated: Optional[datetime] = None
Expand Down Expand Up @@ -93,7 +94,7 @@ class AssertionResult:
logger.info(f"Found successful assertion: {assertion_urn}")
result = False
if last_updated is not None:
last_run = datetime.fromtimestamp(last_assertion.time / 1000)
last_run = parse_ts_millis(last_assertion.time)
if last_updated > last_run:
logger.error(
f"Missing assertion run for {assertion_urn}. The dataset was updated on {last_updated} but the last assertion run was at {last_run}"
Expand All @@ -117,7 +118,7 @@ def is_circuit_breaker_active(self, urn: str) -> bool:
)

if not last_updated:
last_updated = datetime.now() - self.config.time_delta
last_updated = datetime.now(tz=timezone.utc) - self.config.time_delta

Check warning on line 121 in metadata-ingestion/src/datahub/api/circuit_breaker/assertion_circuit_breaker.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/api/circuit_breaker/assertion_circuit_breaker.py#L121

Added line #L121 was not covered by tests
logger.info(
f"Dataset {urn} doesn't have last updated or check_last_assertion_time is false, using calculated min assertion date {last_updated}"
)
Expand Down
4 changes: 3 additions & 1 deletion metadata-ingestion/src/datahub/configuration/datetimes.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import dateutil.parser
import humanfriendly

from datahub.emitter.mce_builder import parse_ts_millis

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -39,7 +41,7 @@ def parse_user_datetime(input: str) -> datetime:
return datetime.fromtimestamp(ts, tz=timezone.utc)
except (OverflowError, ValueError):
# This is likely a timestamp in milliseconds.
return datetime.fromtimestamp(ts / 1000, tz=timezone.utc)
return parse_ts_millis(ts)

Check warning on line 44 in metadata-ingestion/src/datahub/configuration/datetimes.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/configuration/datetimes.py#L44

Added line #L44 was not covered by tests

# Then try parsing as a relative time.
with contextlib.suppress(humanfriendly.InvalidTimespan):
Expand Down
4 changes: 2 additions & 2 deletions metadata-ingestion/src/datahub/emitter/mce_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def make_ts_millis(ts: Optional[datetime]) -> Optional[int]:


@overload
def parse_ts_millis(ts: int) -> datetime:
def parse_ts_millis(ts: float) -> datetime:
...

Check warning on line 108 in metadata-ingestion/src/datahub/emitter/mce_builder.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/emitter/mce_builder.py#L108

Added line #L108 was not covered by tests


Expand All @@ -113,7 +113,7 @@ def parse_ts_millis(ts: None) -> None:
...

Check warning on line 113 in metadata-ingestion/src/datahub/emitter/mce_builder.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/emitter/mce_builder.py#L113

Added line #L113 was not covered by tests


def parse_ts_millis(ts: Optional[int]) -> Optional[datetime]:
def parse_ts_millis(ts: Optional[float]) -> Optional[datetime]:
if ts is None:
return None
return datetime.fromtimestamp(ts / 1000, tz=timezone.utc)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import logging
from datetime import datetime, timezone
from typing import (
TYPE_CHECKING,
Dict,
Expand All @@ -14,7 +13,7 @@
)

from datahub.configuration.time_window_config import BaseTimeWindowConfig
from datahub.emitter.mce_builder import make_dataplatform_instance_urn
from datahub.emitter.mce_builder import make_dataplatform_instance_urn, parse_ts_millis
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mcp_builder import entity_supports_aspect
from datahub.ingestion.api.workunit import MetadataWorkUnit
Expand Down Expand Up @@ -480,7 +479,7 @@ def auto_empty_dataset_usage_statistics(
logger.warning(
f"Usage statistics with unexpected timestamps, bucket_duration={config.bucket_duration}:\n"
", ".join(
str(datetime.fromtimestamp(ts / 1000, tz=timezone.utc))
str(parse_ts_millis(ts))
for ts in invalid_timestamps
)
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
from collections import defaultdict
from dataclasses import dataclass, field
from datetime import datetime, timezone
from datetime import datetime
from functools import lru_cache
from typing import Any, Dict, FrozenSet, Iterable, Iterator, List, Optional

Expand All @@ -15,6 +15,7 @@
TimePartitioningType,
)

from datahub.emitter.mce_builder import parse_ts_millis
from datahub.ingestion.api.source import SourceReport
from datahub.ingestion.source.bigquery_v2.bigquery_audit import BigqueryTableIdentifier
from datahub.ingestion.source.bigquery_v2.bigquery_helper import parse_labels
Expand Down Expand Up @@ -393,13 +394,7 @@ def _make_bigquery_table(
name=table.table_name,
created=table.created,
table_type=table.table_type,
last_altered=(
datetime.fromtimestamp(
table.get("last_altered") / 1000, tz=timezone.utc
)
if table.get("last_altered") is not None
else None
),
last_altered=parse_ts_millis(table.get("last_altered")),
size_in_bytes=table.get("bytes"),
rows_count=table.get("row_count"),
comment=table.comment,
Expand Down Expand Up @@ -460,11 +455,7 @@ def _make_bigquery_view(view: bigquery.Row) -> BigqueryView:
return BigqueryView(
name=view.table_name,
created=view.created,
last_altered=(
datetime.fromtimestamp(view.get("last_altered") / 1000, tz=timezone.utc)
if view.get("last_altered") is not None
else None
),
last_altered=(parse_ts_millis(view.get("last_altered"))),
comment=view.comment,
view_definition=view.view_definition,
materialized=view.table_type == BigqueryTableType.MATERIALIZED_VIEW,
Expand Down Expand Up @@ -705,13 +696,7 @@ def _make_bigquery_table_snapshot(snapshot: bigquery.Row) -> BigqueryTableSnapsh
return BigqueryTableSnapshot(
name=snapshot.table_name,
created=snapshot.created,
last_altered=(
datetime.fromtimestamp(
snapshot.get("last_altered") / 1000, tz=timezone.utc
)
if snapshot.get("last_altered") is not None
else None
),
last_altered=parse_ts_millis(snapshot.get("last_altered")),
comment=snapshot.comment,
ddl=snapshot.ddl,
snapshot_time=snapshot.snapshot_time,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from confluent_kafka.schema_registry.avro import AvroDeserializer

from datahub.configuration.kafka import KafkaConsumerConnectionConfig
from datahub.emitter.mce_builder import parse_ts_millis

Check warning on line 15 in metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_kafka_reader.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_kafka_reader.py#L15

Added line #L15 was not covered by tests
from datahub.ingestion.api.closeable import Closeable
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.source.datahub.config import DataHubSourceConfig
Expand Down Expand Up @@ -92,7 +93,7 @@ def _poll_partition(
if mcl.created and mcl.created.time > stop_time.timestamp() * 1000:
logger.info(
f"Stopped reading from kafka, reached MCL "
f"with audit stamp {datetime.fromtimestamp(mcl.created.time / 1000)}"
f"with audit stamp {parse_ts_millis(mcl.created.time)}"
)
break

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@
from sqlalchemy import create_engine, inspect
from sqlalchemy.engine.reflection import Inspector

from datahub.emitter.mce_builder import make_dataset_urn_with_platform_instance
from datahub.emitter.mce_builder import (
make_dataset_urn_with_platform_instance,
parse_ts_millis,
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.ge_data_profiler import (
Expand Down Expand Up @@ -245,11 +248,7 @@ def is_dataset_eligible_for_profiling(
# If profiling state exists we have to carry over to the new state
self.state_handler.add_to_state(dataset_urn, last_profiled)

threshold_time: Optional[datetime] = (
datetime.fromtimestamp(last_profiled / 1000, timezone.utc)
if last_profiled
else None
)
threshold_time: Optional[datetime] = parse_ts_millis(last_profiled)
if (
not threshold_time
and self.config.profiling.profile_if_updated_since_days is not None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import pydantic

from datahub.configuration.common import ConfigModel
from datahub.emitter.mce_builder import parse_ts_millis
from datahub.metadata.schema_classes import (
DatahubIngestionCheckpointClass,
IngestionCheckpointStateClass,
Expand Down Expand Up @@ -144,7 +145,7 @@ def create_from_checkpoint_aspect(
)
logger.info(
f"Successfully constructed last checkpoint state for job {job_name} "
f"with timestamp {datetime.fromtimestamp(checkpoint_aspect.timestampMillis/1000, tz=timezone.utc)}"
f"with timestamp {parse_ts_millis(checkpoint_aspect.timestampMillis)}"
)
return checkpoint
return None
Expand Down
35 changes: 8 additions & 27 deletions metadata-ingestion/src/datahub/ingestion/source/unity/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import dataclasses
import logging
from datetime import datetime, timezone
from datetime import datetime
from typing import Any, Dict, Iterable, List, Optional, Union, cast
from unittest.mock import patch

Expand All @@ -27,6 +27,7 @@
from databricks.sdk.service.workspace import ObjectType

import datahub
from datahub.emitter.mce_builder import parse_ts_millis
from datahub.ingestion.source.unity.hive_metastore_proxy import HiveMetastoreProxy
from datahub.ingestion.source.unity.proxy_profiling import (
UnityCatalogProxyProfilingMixin,
Expand Down Expand Up @@ -211,16 +212,8 @@ def workspace_notebooks(self) -> Iterable[Notebook]:
id=obj.object_id,
path=obj.path,
language=obj.language,
created_at=(
datetime.fromtimestamp(obj.created_at / 1000, tz=timezone.utc)
if obj.created_at
else None
),
modified_at=(
datetime.fromtimestamp(obj.modified_at / 1000, tz=timezone.utc)
if obj.modified_at
else None
),
created_at=parse_ts_millis(obj.created_at),
modified_at=parse_ts_millis(obj.modified_at),
)

def query_history(
Expand Down Expand Up @@ -452,17 +445,9 @@ def _create_table(
properties=obj.properties or {},
owner=obj.owner,
generation=obj.generation,
created_at=(
datetime.fromtimestamp(obj.created_at / 1000, tz=timezone.utc)
if obj.created_at
else None
),
created_at=(parse_ts_millis(obj.created_at) if obj.created_at else None),
created_by=obj.created_by,
updated_at=(
datetime.fromtimestamp(obj.updated_at / 1000, tz=timezone.utc)
if obj.updated_at
else None
),
updated_at=(parse_ts_millis(obj.updated_at) if obj.updated_at else None),
updated_by=obj.updated_by,
table_id=obj.table_id,
comment=obj.comment,
Expand Down Expand Up @@ -500,12 +485,8 @@ def _create_query(self, info: QueryInfo) -> Optional[Query]:
query_id=info.query_id,
query_text=info.query_text,
statement_type=info.statement_type,
start_time=datetime.fromtimestamp(
info.query_start_time_ms / 1000, tz=timezone.utc
),
end_time=datetime.fromtimestamp(
info.query_end_time_ms / 1000, tz=timezone.utc
),
start_time=parse_ts_millis(info.query_start_time_ms),
end_time=parse_ts_millis(info.query_end_time_ms),
user_id=info.user_id,
user_name=info.user_name,
executed_as_user_id=info.executed_as_user_id,
Expand Down
11 changes: 8 additions & 3 deletions metadata-ingestion/src/datahub/utilities/time.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import time
from dataclasses import dataclass
from datetime import datetime, timezone
from datetime import datetime

from datahub.emitter.mce_builder import make_ts_millis, parse_ts_millis


def get_current_time_in_seconds() -> int:
Expand All @@ -9,12 +11,15 @@ def get_current_time_in_seconds() -> int:

def ts_millis_to_datetime(ts_millis: int) -> datetime:
"""Converts input timestamp in milliseconds to a datetime object with UTC timezone"""
return datetime.fromtimestamp(ts_millis / 1000, tz=timezone.utc)
return parse_ts_millis(ts_millis)

Check warning on line 14 in metadata-ingestion/src/datahub/utilities/time.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/utilities/time.py#L14

Added line #L14 was not covered by tests


def datetime_to_ts_millis(dt: datetime) -> int:
"""Converts a datetime object to timestamp in milliseconds"""
return int(round(dt.timestamp() * 1000))
# TODO: Deprecate these helpers in favor of make_ts_millis and parse_ts_millis.
# The other ones support None with a typing overload.
# Also possibly move those helpers to this file.
return make_ts_millis(dt)


@dataclass
Expand Down

0 comments on commit 59403b0

Please sign in to comment.