Skip to content

Commit

Permalink
feat(ingest): add parse_ts_millis helper (#12231)
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Dec 27, 2024
1 parent ed8639e commit d042354
Show file tree
Hide file tree
Showing 23 changed files with 145 additions and 148 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
from dataclasses import dataclass
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, List, Optional

from pydantic import Field
Expand All @@ -10,6 +10,7 @@
CircuitBreakerConfig,
)
from datahub.api.graphql import Assertion, Operation
from datahub.emitter.mce_builder import parse_ts_millis

logger: logging.Logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -49,7 +50,7 @@ def get_last_updated(self, urn: str) -> Optional[datetime]:
if not operations:
return None
else:
return datetime.fromtimestamp(operations[0]["lastUpdatedTimestamp"] / 1000)
return parse_ts_millis(operations[0]["lastUpdatedTimestamp"])

def _check_if_assertion_failed(
self, assertions: List[Dict[str, Any]], last_updated: Optional[datetime] = None
Expand Down Expand Up @@ -93,7 +94,7 @@ class AssertionResult:
logger.info(f"Found successful assertion: {assertion_urn}")
result = False
if last_updated is not None:
last_run = datetime.fromtimestamp(last_assertion.time / 1000)
last_run = parse_ts_millis(last_assertion.time)
if last_updated > last_run:
logger.error(
f"Missing assertion run for {assertion_urn}. The dataset was updated on {last_updated} but the last assertion run was at {last_run}"
Expand All @@ -117,7 +118,7 @@ def is_circuit_breaker_active(self, urn: str) -> bool:
)

if not last_updated:
last_updated = datetime.now() - self.config.time_delta
last_updated = datetime.now(tz=timezone.utc) - self.config.time_delta
logger.info(
f"Dataset {urn} doesn't have last updated or check_last_assertion_time is false, using calculated min assertion date {last_updated}"
)
Expand Down
18 changes: 17 additions & 1 deletion metadata-ingestion/src/datahub/emitter/mce_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import os
import re
import time
from datetime import datetime
from datetime import datetime, timezone
from enum import Enum
from typing import (
TYPE_CHECKING,
Expand Down Expand Up @@ -103,6 +103,22 @@ def make_ts_millis(ts: Optional[datetime]) -> Optional[int]:
return int(ts.timestamp() * 1000)


@overload
def parse_ts_millis(ts: float) -> datetime:
...


@overload
def parse_ts_millis(ts: None) -> None:
...


def parse_ts_millis(ts: Optional[float]) -> Optional[datetime]:
if ts is None:
return None
return datetime.fromtimestamp(ts / 1000, tz=timezone.utc)


def make_data_platform_urn(platform: str) -> str:
if platform.startswith("urn:li:dataPlatform:"):
return platform
Expand Down
9 changes: 2 additions & 7 deletions metadata-ingestion/src/datahub/emitter/mcp_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
from pydantic.main import BaseModel

from datahub.cli.env_utils import get_boolean_env_variable
from datahub.emitter.enum_helpers import get_enum_options
from datahub.emitter.mce_builder import (
ALL_ENV_TYPES,
Aspect,
datahub_guid,
make_container_urn,
Expand All @@ -25,7 +25,6 @@
ContainerClass,
DomainsClass,
EmbedClass,
FabricTypeClass,
GlobalTagsClass,
MetadataChangeEventClass,
OwnerClass,
Expand Down Expand Up @@ -206,11 +205,7 @@ def gen_containers(
# Extra validation on the env field.
# In certain cases (mainly for backwards compatibility), the env field will actually
# have a platform instance name.
env = (
container_key.env
if container_key.env in get_enum_options(FabricTypeClass)
else None
)
env = container_key.env if container_key.env in ALL_ENV_TYPES else None

container_urn = container_key.as_urn()

Expand Down
4 changes: 2 additions & 2 deletions metadata-ingestion/src/datahub/emitter/mcp_patch_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import time
from collections import defaultdict
from dataclasses import dataclass
from typing import Any, Dict, Iterable, List, Optional, Sequence, Union
from typing import Any, Dict, List, Optional, Sequence, Union

from datahub.emitter.aspect import JSON_PATCH_CONTENT_TYPE
from datahub.emitter.serialization_helper import pre_json_transform
Expand Down Expand Up @@ -75,7 +75,7 @@ def _add_patch(
# TODO: Validate that aspectName is a valid aspect for this entityType
self.patches[aspect_name].append(_Patch(op, path, value))

def build(self) -> Iterable[MetadataChangeProposalClass]:
def build(self) -> List[MetadataChangeProposalClass]:
return [
MetadataChangeProposalClass(
entityUrn=self.urn,
Expand Down
4 changes: 2 additions & 2 deletions metadata-ingestion/src/datahub/emitter/rest_emitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import logging
import os
from json.decoder import JSONDecodeError
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Union
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Sequence, Union

import requests
from deprecated import deprecated
Expand Down Expand Up @@ -288,7 +288,7 @@ def emit_mcp(

def emit_mcps(
self,
mcps: List[Union[MetadataChangeProposal, MetadataChangeProposalWrapper]],
mcps: Sequence[Union[MetadataChangeProposal, MetadataChangeProposalWrapper]],
async_flag: Optional[bool] = None,
) -> int:
logger.debug("Attempting to emit batch mcps")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import logging
from datetime import datetime, timezone
from typing import (
TYPE_CHECKING,
Dict,
Expand All @@ -14,7 +13,7 @@
)

from datahub.configuration.time_window_config import BaseTimeWindowConfig
from datahub.emitter.mce_builder import make_dataplatform_instance_urn
from datahub.emitter.mce_builder import make_dataplatform_instance_urn, parse_ts_millis
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mcp_builder import entity_supports_aspect
from datahub.ingestion.api.workunit import MetadataWorkUnit
Expand Down Expand Up @@ -479,10 +478,7 @@ def auto_empty_dataset_usage_statistics(
if invalid_timestamps:
logger.warning(
f"Usage statistics with unexpected timestamps, bucket_duration={config.bucket_duration}:\n"
", ".join(
str(datetime.fromtimestamp(ts / 1000, tz=timezone.utc))
for ts in invalid_timestamps
)
", ".join(str(parse_ts_millis(ts)) for ts in invalid_timestamps)
)

for bucket in bucket_timestamps:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
from collections import defaultdict
from dataclasses import dataclass, field
from datetime import datetime, timezone
from datetime import datetime
from functools import lru_cache
from typing import Any, Dict, FrozenSet, Iterable, Iterator, List, Optional

Expand All @@ -15,6 +15,7 @@
TimePartitioningType,
)

from datahub.emitter.mce_builder import parse_ts_millis
from datahub.ingestion.api.source import SourceReport
from datahub.ingestion.source.bigquery_v2.bigquery_audit import BigqueryTableIdentifier
from datahub.ingestion.source.bigquery_v2.bigquery_helper import parse_labels
Expand Down Expand Up @@ -393,13 +394,7 @@ def _make_bigquery_table(
name=table.table_name,
created=table.created,
table_type=table.table_type,
last_altered=(
datetime.fromtimestamp(
table.get("last_altered") / 1000, tz=timezone.utc
)
if table.get("last_altered") is not None
else None
),
last_altered=parse_ts_millis(table.get("last_altered")),
size_in_bytes=table.get("bytes"),
rows_count=table.get("row_count"),
comment=table.comment,
Expand Down Expand Up @@ -460,11 +455,7 @@ def _make_bigquery_view(view: bigquery.Row) -> BigqueryView:
return BigqueryView(
name=view.table_name,
created=view.created,
last_altered=(
datetime.fromtimestamp(view.get("last_altered") / 1000, tz=timezone.utc)
if view.get("last_altered") is not None
else None
),
last_altered=(parse_ts_millis(view.get("last_altered"))),
comment=view.comment,
view_definition=view.view_definition,
materialized=view.table_type == BigqueryTableType.MATERIALIZED_VIEW,
Expand Down Expand Up @@ -705,13 +696,7 @@ def _make_bigquery_table_snapshot(snapshot: bigquery.Row) -> BigqueryTableSnapsh
return BigqueryTableSnapshot(
name=snapshot.table_name,
created=snapshot.created,
last_altered=(
datetime.fromtimestamp(
snapshot.get("last_altered") / 1000, tz=timezone.utc
)
if snapshot.get("last_altered") is not None
else None
),
last_altered=parse_ts_millis(snapshot.get("last_altered")),
comment=snapshot.comment,
ddl=snapshot.ddl,
snapshot_time=snapshot.snapshot_time,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from confluent_kafka.schema_registry.avro import AvroDeserializer

from datahub.configuration.kafka import KafkaConsumerConnectionConfig
from datahub.emitter.mce_builder import parse_ts_millis
from datahub.ingestion.api.closeable import Closeable
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.source.datahub.config import DataHubSourceConfig
Expand Down Expand Up @@ -92,7 +93,7 @@ def _poll_partition(
if mcl.created and mcl.created.time > stop_time.timestamp() * 1000:
logger.info(
f"Stopped reading from kafka, reached MCL "
f"with audit stamp {datetime.fromtimestamp(mcl.created.time / 1000)}"
f"with audit stamp {parse_ts_millis(mcl.created.time)}"
)
break

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@
from sqlalchemy import create_engine, inspect
from sqlalchemy.engine.reflection import Inspector

from datahub.emitter.mce_builder import make_dataset_urn_with_platform_instance
from datahub.emitter.mce_builder import (
make_dataset_urn_with_platform_instance,
parse_ts_millis,
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.ge_data_profiler import (
Expand Down Expand Up @@ -245,11 +248,7 @@ def is_dataset_eligible_for_profiling(
# If profiling state exists we have to carry over to the new state
self.state_handler.add_to_state(dataset_urn, last_profiled)

threshold_time: Optional[datetime] = (
datetime.fromtimestamp(last_profiled / 1000, timezone.utc)
if last_profiled
else None
)
threshold_time: Optional[datetime] = parse_ts_millis(last_profiled)
if (
not threshold_time
and self.config.profiling.profile_if_updated_since_days is not None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import pydantic

from datahub.configuration.common import ConfigModel
from datahub.emitter.mce_builder import parse_ts_millis
from datahub.metadata.schema_classes import (
DatahubIngestionCheckpointClass,
IngestionCheckpointStateClass,
Expand Down Expand Up @@ -144,7 +145,7 @@ def create_from_checkpoint_aspect(
)
logger.info(
f"Successfully constructed last checkpoint state for job {job_name} "
f"with timestamp {datetime.fromtimestamp(checkpoint_aspect.timestampMillis/1000, tz=timezone.utc)}"
f"with timestamp {parse_ts_millis(checkpoint_aspect.timestampMillis)}"
)
return checkpoint
return None
Expand Down
35 changes: 8 additions & 27 deletions metadata-ingestion/src/datahub/ingestion/source/unity/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import dataclasses
import logging
from datetime import datetime, timezone
from datetime import datetime
from typing import Any, Dict, Iterable, List, Optional, Union, cast
from unittest.mock import patch

Expand All @@ -27,6 +27,7 @@
from databricks.sdk.service.workspace import ObjectType

import datahub
from datahub.emitter.mce_builder import parse_ts_millis
from datahub.ingestion.source.unity.hive_metastore_proxy import HiveMetastoreProxy
from datahub.ingestion.source.unity.proxy_profiling import (
UnityCatalogProxyProfilingMixin,
Expand Down Expand Up @@ -211,16 +212,8 @@ def workspace_notebooks(self) -> Iterable[Notebook]:
id=obj.object_id,
path=obj.path,
language=obj.language,
created_at=(
datetime.fromtimestamp(obj.created_at / 1000, tz=timezone.utc)
if obj.created_at
else None
),
modified_at=(
datetime.fromtimestamp(obj.modified_at / 1000, tz=timezone.utc)
if obj.modified_at
else None
),
created_at=parse_ts_millis(obj.created_at),
modified_at=parse_ts_millis(obj.modified_at),
)

def query_history(
Expand Down Expand Up @@ -452,17 +445,9 @@ def _create_table(
properties=obj.properties or {},
owner=obj.owner,
generation=obj.generation,
created_at=(
datetime.fromtimestamp(obj.created_at / 1000, tz=timezone.utc)
if obj.created_at
else None
),
created_at=(parse_ts_millis(obj.created_at) if obj.created_at else None),
created_by=obj.created_by,
updated_at=(
datetime.fromtimestamp(obj.updated_at / 1000, tz=timezone.utc)
if obj.updated_at
else None
),
updated_at=(parse_ts_millis(obj.updated_at) if obj.updated_at else None),
updated_by=obj.updated_by,
table_id=obj.table_id,
comment=obj.comment,
Expand Down Expand Up @@ -500,12 +485,8 @@ def _create_query(self, info: QueryInfo) -> Optional[Query]:
query_id=info.query_id,
query_text=info.query_text,
statement_type=info.statement_type,
start_time=datetime.fromtimestamp(
info.query_start_time_ms / 1000, tz=timezone.utc
),
end_time=datetime.fromtimestamp(
info.query_end_time_ms / 1000, tz=timezone.utc
),
start_time=parse_ts_millis(info.query_start_time_ms),
end_time=parse_ts_millis(info.query_end_time_ms),
user_id=info.user_id,
user_name=info.user_name,
executed_as_user_id=info.executed_as_user_id,
Expand Down
11 changes: 8 additions & 3 deletions metadata-ingestion/src/datahub/utilities/time.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import time
from dataclasses import dataclass
from datetime import datetime, timezone
from datetime import datetime

from datahub.emitter.mce_builder import make_ts_millis, parse_ts_millis


def get_current_time_in_seconds() -> int:
Expand All @@ -9,12 +11,15 @@ def get_current_time_in_seconds() -> int:

def ts_millis_to_datetime(ts_millis: int) -> datetime:
"""Converts input timestamp in milliseconds to a datetime object with UTC timezone"""
return datetime.fromtimestamp(ts_millis / 1000, tz=timezone.utc)
return parse_ts_millis(ts_millis)


def datetime_to_ts_millis(dt: datetime) -> int:
"""Converts a datetime object to timestamp in milliseconds"""
return int(round(dt.timestamp() * 1000))
# TODO: Deprecate these helpers in favor of make_ts_millis and parse_ts_millis.
# The other ones support None with a typing overload.
# Also possibly move those helpers to this file.
return make_ts_millis(dt)


@dataclass
Expand Down
Loading

0 comments on commit d042354

Please sign in to comment.