From 1498c36875450b1a1f44d53e8e8c47c41a91dc69 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Mon, 29 Jan 2024 10:50:47 -0800 Subject: [PATCH] chore(cli): drop support for python 3.7 (#9731) --- .github/workflows/metadata-ingestion.yml | 4 +- docs/cli.md | 2 +- docs/how/updating-datahub.md | 11 +- docs/quickstart.md | 2 +- .../airflow-plugin/setup.py | 14 +- .../airflow-plugin/tests/unit/test_airflow.py | 204 ++-- metadata-ingestion/build.gradle | 2 +- metadata-ingestion/cli-ingestion.md | 21 +- metadata-ingestion/developing.md | 6 +- metadata-ingestion/setup.py | 23 +- metadata-ingestion/src/datahub/__init__.py | 11 +- .../src/datahub/ingestion/api/report.py | 21 +- .../src/datahub/ingestion/source/feast.py | 5 - .../ingestion/source/iceberg/iceberg.py | 5 - .../src/datahub/ingestion/source/mlflow.py | 6 - .../source/schema_inference/object.py | 2 +- .../feast/test_feast_repository.py | 7 - .../tests/integration/iceberg/test_iceberg.py | 9 +- .../integration/mlflow/test_mlflow_source.py | 184 ++-- .../integration/sql_server/test_sql_server.py | 5 - metadata-ingestion/tests/unit/test_iceberg.py | 899 +++++++++--------- .../tests/unit/test_mlflow_source.py | 225 ++--- 22 files changed, 805 insertions(+), 863 deletions(-) diff --git a/.github/workflows/metadata-ingestion.yml b/.github/workflows/metadata-ingestion.yml index 1da08b14b8b5b..e7d6b7b97c099 100644 --- a/.github/workflows/metadata-ingestion.yml +++ b/.github/workflows/metadata-ingestion.yml @@ -31,7 +31,7 @@ jobs: # DATAHUB_LOOKML_GIT_TEST_SSH_KEY: ${{ secrets.DATAHUB_LOOKML_GIT_TEST_SSH_KEY }} strategy: matrix: - python-version: ["3.7", "3.10"] + python-version: ["3.8", "3.10"] command: [ "testQuick", @@ -40,7 +40,7 @@ jobs: "testIntegrationBatch2", ] include: - - python-version: "3.7" + - python-version: "3.8" - python-version: "3.10" fail-fast: false steps: diff --git a/docs/cli.md b/docs/cli.md index cb5077db42906..927270b42259d 100644 --- a/docs/cli.md +++ b/docs/cli.md @@ -24,7 +24,7 @@ source venv/bin/activate # activate the environment Once inside the virtual environment, install `datahub` using the following commands ```shell -# Requires Python 3.7+ +# Requires Python 3.8+ python3 -m pip install --upgrade pip wheel setuptools python3 -m pip install --upgrade acryl-datahub # validate that the install was successful diff --git a/docs/how/updating-datahub.md b/docs/how/updating-datahub.md index b671e2fc5d123..6b6903b04f383 100644 --- a/docs/how/updating-datahub.md +++ b/docs/how/updating-datahub.md @@ -10,8 +10,10 @@ This file documents any backwards-incompatible changes in DataHub and assists pe - Neo4j 5.x, may require migration from 4.x - Build requires JDK17 (Runtime Java 11) - Build requires Docker Compose > 2.20 +- #9731 - The `acryl-datahub` CLI now requires Python 3.8+ - #9601 - The Unity Catalog(UC) ingestion source config `include_metastore` is now disabled by default. This change will affect the urns of all entities in the workspace.
- Entity Hierarchy with `include_metastore: true` (Old) + Entity Hierarchy with `include_metastore: true` (Old) + ``` - UC Metastore - Catalog @@ -19,16 +21,19 @@ This file documents any backwards-incompatible changes in DataHub and assists pe - Table ``` - Entity Hierarchy with `include_metastore: false` (New) + Entity Hierarchy with `include_metastore: false` (New) + ``` - Catalog - Schema - Table ``` + We recommend using `platform_instance` for differentiating across metastores. If stateful ingestion is enabled, running ingestion with latest cli version will perform all required cleanup. Otherwise, we recommend soft deleting all databricks data via the DataHub CLI: - `datahub delete --platform databricks --soft` and then reingesting with latest cli version. + `datahub delete --platform databricks --soft` and then reingesting with latest cli version. + - #9601 - The Unity Catalog(UC) ingestion source config `include_hive_metastore` is now enabled by default. This requires config `warehouse_id` to be set. You can disable `include_hive_metastore` by setting it to `False` to avoid ingesting legacy hive metastore catalog in Databricks. ### Potential Downtime diff --git a/docs/quickstart.md b/docs/quickstart.md index 5856ef84c0074..507be6ba05471 100644 --- a/docs/quickstart.md +++ b/docs/quickstart.md @@ -22,7 +22,7 @@ If you're interested in a managed version, [Acryl Data](https://www.acryldata.io | Linux | [Docker for Linux](https://docs.docker.com/desktop/install/linux-install/) and [Docker Compose](https://docs.docker.com/compose/install/linux/) | - **Launch the Docker engine** from command line or the desktop app. -- Ensure you have **Python 3.7+** installed & configured. (Check using `python3 --version`). +- Ensure you have **Python 3.8+** installed & configured. (Check using `python3 --version`). :::note Docker Resource Allocation diff --git a/metadata-ingestion-modules/airflow-plugin/setup.py b/metadata-ingestion-modules/airflow-plugin/setup.py index 838322f83833b..1a3e844cedc1f 100644 --- a/metadata-ingestion-modules/airflow-plugin/setup.py +++ b/metadata-ingestion-modules/airflow-plugin/setup.py @@ -18,16 +18,10 @@ def get_long_description(): _self_pin = f"=={_version}" if not _version.endswith("dev0") else "" -rest_common = {"requests", "requests_file"} - base_requirements = { - # Compatibility. - "dataclasses>=0.6; python_version < '3.7'", - "mypy_extensions>=0.4.3", + f"acryl-datahub[datahub-rest]{_self_pin}", # Actual dependencies. - "pydantic>=1.5.1", "apache-airflow >= 2.0.2", - *rest_common, } plugins: Dict[str, Set[str]] = { @@ -42,9 +36,8 @@ def get_long_description(): }, "plugin-v1": set(), "plugin-v2": { - # The v2 plugin requires Python 3.8+. f"acryl-datahub[sql-parser]{_self_pin}", - "openlineage-airflow==1.2.0; python_version >= '3.8'", + "openlineage-airflow==1.2.0", }, } @@ -144,7 +137,6 @@ def get_long_description(): "Programming Language :: Python", "Programming Language :: Python :: 3", "Programming Language :: Python :: 3 :: Only", - "Programming Language :: Python :: 3.7", "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", @@ -161,7 +153,7 @@ def get_long_description(): ], # Package info. zip_safe=False, - python_requires=">=3.7", + python_requires=">=3.8", package_data={ "datahub_airflow_plugin": ["py.typed"], }, diff --git a/metadata-ingestion-modules/airflow-plugin/tests/unit/test_airflow.py b/metadata-ingestion-modules/airflow-plugin/tests/unit/test_airflow.py index 93b4af0501985..b484713e18faf 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/unit/test_airflow.py +++ b/metadata-ingestion-modules/airflow-plugin/tests/unit/test_airflow.py @@ -1,7 +1,6 @@ import datetime import json import os -import sys from contextlib import contextmanager from typing import Iterator from unittest import mock @@ -318,137 +317,134 @@ def test_lineage_backend(mock_emit, inlets, outlets, capture_executions): # Check that the right things were emitted. assert mock_emitter.emit.call_count == 17 if capture_executions else 9 - # Running further checks based on python version because args only exists in python 3.8+ - if sys.version_info > (3, 8): - assert mock_emitter.method_calls[0].args[0].aspectName == "dataFlowInfo" + # TODO: Replace this with a golden file-based comparison. + assert mock_emitter.method_calls[0].args[0].aspectName == "dataFlowInfo" + assert ( + mock_emitter.method_calls[0].args[0].entityUrn + == "urn:li:dataFlow:(airflow,test_lineage_is_sent_to_backend,prod)" + ) + + assert mock_emitter.method_calls[1].args[0].aspectName == "ownership" + assert ( + mock_emitter.method_calls[1].args[0].entityUrn + == "urn:li:dataFlow:(airflow,test_lineage_is_sent_to_backend,prod)" + ) + + assert mock_emitter.method_calls[2].args[0].aspectName == "globalTags" + assert ( + mock_emitter.method_calls[2].args[0].entityUrn + == "urn:li:dataFlow:(airflow,test_lineage_is_sent_to_backend,prod)" + ) + + assert mock_emitter.method_calls[3].args[0].aspectName == "dataJobInfo" + assert ( + mock_emitter.method_calls[3].args[0].entityUrn + == "urn:li:dataJob:(urn:li:dataFlow:(airflow,test_lineage_is_sent_to_backend,prod),task2)" + ) + + assert mock_emitter.method_calls[4].args[0].aspectName == "dataJobInputOutput" + assert ( + mock_emitter.method_calls[4].args[0].entityUrn + == "urn:li:dataJob:(urn:li:dataFlow:(airflow,test_lineage_is_sent_to_backend,prod),task2)" + ) + assert ( + mock_emitter.method_calls[4].args[0].aspect.inputDatajobs[0] + == "urn:li:dataJob:(urn:li:dataFlow:(airflow,test_lineage_is_sent_to_backend,prod),task1_upstream)" + ) + assert ( + mock_emitter.method_calls[4].args[0].aspect.inputDatajobs[1] + == "urn:li:dataJob:(urn:li:dataFlow:(airflow,testDag,PROD),testTask)" + ) + assert ( + mock_emitter.method_calls[4].args[0].aspect.inputDatasets[0] + == "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableConsumed,PROD)" + ) + assert ( + mock_emitter.method_calls[4].args[0].aspect.outputDatasets[0] + == "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableProduced,PROD)" + ) + + assert mock_emitter.method_calls[5].args[0].aspectName == "status" + assert ( + mock_emitter.method_calls[5].args[0].entityUrn + == "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableConsumed,PROD)" + ) + + assert mock_emitter.method_calls[6].args[0].aspectName == "status" + assert ( + mock_emitter.method_calls[6].args[0].entityUrn + == "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableProduced,PROD)" + ) + + assert mock_emitter.method_calls[7].args[0].aspectName == "ownership" + assert ( + mock_emitter.method_calls[7].args[0].entityUrn + == "urn:li:dataJob:(urn:li:dataFlow:(airflow,test_lineage_is_sent_to_backend,prod),task2)" + ) + + assert mock_emitter.method_calls[8].args[0].aspectName == "globalTags" + assert ( + mock_emitter.method_calls[8].args[0].entityUrn + == "urn:li:dataJob:(urn:li:dataFlow:(airflow,test_lineage_is_sent_to_backend,prod),task2)" + ) + + if capture_executions: assert ( - mock_emitter.method_calls[0].args[0].entityUrn - == "urn:li:dataFlow:(airflow,test_lineage_is_sent_to_backend,prod)" + mock_emitter.method_calls[9].args[0].aspectName + == "dataProcessInstanceProperties" ) - - assert mock_emitter.method_calls[1].args[0].aspectName == "ownership" assert ( - mock_emitter.method_calls[1].args[0].entityUrn - == "urn:li:dataFlow:(airflow,test_lineage_is_sent_to_backend,prod)" + mock_emitter.method_calls[9].args[0].entityUrn + == "urn:li:dataProcessInstance:5e274228107f44cc2dd7c9782168cc29" ) - assert mock_emitter.method_calls[2].args[0].aspectName == "globalTags" assert ( - mock_emitter.method_calls[2].args[0].entityUrn - == "urn:li:dataFlow:(airflow,test_lineage_is_sent_to_backend,prod)" + mock_emitter.method_calls[10].args[0].aspectName + == "dataProcessInstanceRelationships" ) - - assert mock_emitter.method_calls[3].args[0].aspectName == "dataJobInfo" assert ( - mock_emitter.method_calls[3].args[0].entityUrn - == "urn:li:dataJob:(urn:li:dataFlow:(airflow,test_lineage_is_sent_to_backend,prod),task2)" + mock_emitter.method_calls[10].args[0].entityUrn + == "urn:li:dataProcessInstance:5e274228107f44cc2dd7c9782168cc29" ) - assert ( - mock_emitter.method_calls[4].args[0].aspectName == "dataJobInputOutput" + mock_emitter.method_calls[11].args[0].aspectName + == "dataProcessInstanceInput" ) assert ( - mock_emitter.method_calls[4].args[0].entityUrn - == "urn:li:dataJob:(urn:li:dataFlow:(airflow,test_lineage_is_sent_to_backend,prod),task2)" + mock_emitter.method_calls[11].args[0].entityUrn + == "urn:li:dataProcessInstance:5e274228107f44cc2dd7c9782168cc29" ) assert ( - mock_emitter.method_calls[4].args[0].aspect.inputDatajobs[0] - == "urn:li:dataJob:(urn:li:dataFlow:(airflow,test_lineage_is_sent_to_backend,prod),task1_upstream)" + mock_emitter.method_calls[12].args[0].aspectName + == "dataProcessInstanceOutput" ) assert ( - mock_emitter.method_calls[4].args[0].aspect.inputDatajobs[1] - == "urn:li:dataJob:(urn:li:dataFlow:(airflow,testDag,PROD),testTask)" + mock_emitter.method_calls[12].args[0].entityUrn + == "urn:li:dataProcessInstance:5e274228107f44cc2dd7c9782168cc29" ) + assert mock_emitter.method_calls[13].args[0].aspectName == "status" assert ( - mock_emitter.method_calls[4].args[0].aspect.inputDatasets[0] + mock_emitter.method_calls[13].args[0].entityUrn == "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableConsumed,PROD)" ) + assert mock_emitter.method_calls[14].args[0].aspectName == "status" assert ( - mock_emitter.method_calls[4].args[0].aspect.outputDatasets[0] + mock_emitter.method_calls[14].args[0].entityUrn == "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableProduced,PROD)" ) - - assert mock_emitter.method_calls[5].args[0].aspectName == "status" assert ( - mock_emitter.method_calls[5].args[0].entityUrn - == "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableConsumed,PROD)" + mock_emitter.method_calls[15].args[0].aspectName + == "dataProcessInstanceRunEvent" ) - - assert mock_emitter.method_calls[6].args[0].aspectName == "status" assert ( - mock_emitter.method_calls[6].args[0].entityUrn - == "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableProduced,PROD)" + mock_emitter.method_calls[15].args[0].entityUrn + == "urn:li:dataProcessInstance:5e274228107f44cc2dd7c9782168cc29" ) - - assert mock_emitter.method_calls[7].args[0].aspectName == "ownership" assert ( - mock_emitter.method_calls[7].args[0].entityUrn - == "urn:li:dataJob:(urn:li:dataFlow:(airflow,test_lineage_is_sent_to_backend,prod),task2)" + mock_emitter.method_calls[16].args[0].aspectName + == "dataProcessInstanceRunEvent" ) - - assert mock_emitter.method_calls[8].args[0].aspectName == "globalTags" assert ( - mock_emitter.method_calls[8].args[0].entityUrn - == "urn:li:dataJob:(urn:li:dataFlow:(airflow,test_lineage_is_sent_to_backend,prod),task2)" + mock_emitter.method_calls[16].args[0].entityUrn + == "urn:li:dataProcessInstance:5e274228107f44cc2dd7c9782168cc29" ) - - if capture_executions: - assert ( - mock_emitter.method_calls[9].args[0].aspectName - == "dataProcessInstanceProperties" - ) - assert ( - mock_emitter.method_calls[9].args[0].entityUrn - == "urn:li:dataProcessInstance:5e274228107f44cc2dd7c9782168cc29" - ) - - assert ( - mock_emitter.method_calls[10].args[0].aspectName - == "dataProcessInstanceRelationships" - ) - assert ( - mock_emitter.method_calls[10].args[0].entityUrn - == "urn:li:dataProcessInstance:5e274228107f44cc2dd7c9782168cc29" - ) - assert ( - mock_emitter.method_calls[11].args[0].aspectName - == "dataProcessInstanceInput" - ) - assert ( - mock_emitter.method_calls[11].args[0].entityUrn - == "urn:li:dataProcessInstance:5e274228107f44cc2dd7c9782168cc29" - ) - assert ( - mock_emitter.method_calls[12].args[0].aspectName - == "dataProcessInstanceOutput" - ) - assert ( - mock_emitter.method_calls[12].args[0].entityUrn - == "urn:li:dataProcessInstance:5e274228107f44cc2dd7c9782168cc29" - ) - assert mock_emitter.method_calls[13].args[0].aspectName == "status" - assert ( - mock_emitter.method_calls[13].args[0].entityUrn - == "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableConsumed,PROD)" - ) - assert mock_emitter.method_calls[14].args[0].aspectName == "status" - assert ( - mock_emitter.method_calls[14].args[0].entityUrn - == "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableProduced,PROD)" - ) - assert ( - mock_emitter.method_calls[15].args[0].aspectName - == "dataProcessInstanceRunEvent" - ) - assert ( - mock_emitter.method_calls[15].args[0].entityUrn - == "urn:li:dataProcessInstance:5e274228107f44cc2dd7c9782168cc29" - ) - assert ( - mock_emitter.method_calls[16].args[0].aspectName - == "dataProcessInstanceRunEvent" - ) - assert ( - mock_emitter.method_calls[16].args[0].entityUrn - == "urn:li:dataProcessInstance:5e274228107f44cc2dd7c9782168cc29" - ) diff --git a/metadata-ingestion/build.gradle b/metadata-ingestion/build.gradle index b3cc350cc109f..8338124288ec9 100644 --- a/metadata-ingestion/build.gradle +++ b/metadata-ingestion/build.gradle @@ -17,7 +17,7 @@ def get_coverage_arg(test_name) { task checkPythonVersion(type: Exec) { commandLine python_executable, '-c', - 'import sys; assert (3, 11) > sys.version_info >= (3, 7), f"Python version {sys.version_info[:2]} not allowed"' + 'import sys; assert (3, 11) > sys.version_info >= (3, 8), f"Python version {sys.version_info[:2]} not allowed"' } task environmentSetup(type: Exec, dependsOn: checkPythonVersion) { diff --git a/metadata-ingestion/cli-ingestion.md b/metadata-ingestion/cli-ingestion.md index cbdde2cd30167..48cc4ef09db91 100644 --- a/metadata-ingestion/cli-ingestion.md +++ b/metadata-ingestion/cli-ingestion.md @@ -2,26 +2,31 @@ ## Installing the CLI -Make sure you have installed DataHub CLI before following this guide. +Make sure you have installed DataHub CLI before following this guide. + ```shell -# Requires Python 3.7+ +# Requires Python 3.8+ python3 -m pip install --upgrade pip wheel setuptools python3 -m pip install --upgrade acryl-datahub # validate that the install was successful datahub version # If you see "command not found", try running this instead: python3 -m datahub version ``` -Check out the [CLI Installation Guide](../docs/cli.md#installation) for more installation options and troubleshooting tips. + +Check out the [CLI Installation Guide](../docs/cli.md#installation) for more installation options and troubleshooting tips. After that, install the required plugin for the ingestion. ```shell pip install 'acryl-datahub[datahub-rest]' # install the required plugin ``` -Check out the [alternative installation options](../docs/cli.md#alternate-installation-options) for more reference. + +Check out the [alternative installation options](../docs/cli.md#alternate-installation-options) for more reference. ## Configuring a Recipe + Create a recipe.yml file that defines the source and sink for metadata, as shown below. + ```yaml # my_reipe.yml source: @@ -29,7 +34,7 @@ source: config: option_1: ... - + sink: type: config: @@ -39,7 +44,8 @@ sink: For more information and examples on configuring recipes, please refer to [Recipes](recipe_overview.md). ## Ingesting Metadata -You can run ingestion using `datahub ingest` like below. + +You can run ingestion using `datahub ingest` like below. ```shell datahub ingest -c @@ -48,6 +54,7 @@ datahub ingest -c ## Reference Please refer the following pages for advanced guids on CLI ingestion. + - [Reference for `datahub ingest` command](../docs/cli.md#ingest) - [UI Ingestion Guide](../docs/ui-ingestion.md) @@ -56,4 +63,4 @@ DataHub server uses a 3 digit versioning scheme, while the CLI uses a 4 digit sc We do this because we do CLI releases at a much higher frequency than server releases, usually every few days vs twice a month. For ingestion sources, any breaking changes will be highlighted in the [release notes](../docs/how/updating-datahub.md). When fields are deprecated or otherwise changed, we will try to maintain backwards compatibility for two server releases, which is about 4-6 weeks. The CLI will also print warnings whenever deprecated options are used. -::: \ No newline at end of file +::: diff --git a/metadata-ingestion/developing.md b/metadata-ingestion/developing.md index fc3a689124b2c..47e325171ddcc 100644 --- a/metadata-ingestion/developing.md +++ b/metadata-ingestion/developing.md @@ -9,10 +9,10 @@ Also take a look at the guide to [adding a source](./adding-source.md). ### Requirements -1. Python 3.7+ must be installed in your host environment. +1. Python 3.8+ must be installed in your host environment. 2. Java 17 (gradle won't work with newer or older versions) -4. On Debian/Ubuntu: `sudo apt install python3-dev python3-venv` -5. On Fedora (if using LDAP source integration): `sudo yum install openldap-devel` +3. On Debian/Ubuntu: `sudo apt install python3-dev python3-venv` +4. On Fedora (if using LDAP source integration): `sudo yum install openldap-devel` ### Set up your Python environment diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index af2b54ba1cefa..f8d51997330a9 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -1,4 +1,3 @@ -import sys from typing import Dict, Set import setuptools @@ -11,7 +10,6 @@ base_requirements = { # Typing extension should be >=3.10.0.2 ideally but we can't restrict due to a Airflow 2.1 dependency conflict. "typing_extensions>=3.7.4.3", - "mypy_extensions>=0.4.3", # Actual dependencies. "typing-inspect", # pydantic 1.8.2 is incompatible with mypy 0.910. @@ -48,9 +46,7 @@ "click-spinner", "requests_file", "jsonref", - # jsonschema drops python 3.7 support in v4.18.0 - "jsonschema<=4.17.3; python_version < '3.8'", - "jsonschema; python_version >= '3.8'", + "jsonschema", "ruamel.yaml", } @@ -463,7 +459,7 @@ "black==22.12.0", "coverage>=5.1", "faker>=18.4.0", - "flake8>=3.8.3", # DEPRECATION: Once we drop Python 3.7, we can pin to 6.x. + "flake8>=6.0.0", "flake8-tidy-imports>=4.3.0", "flake8-bugbear==23.3.12", "isort>=5.7.0", @@ -489,9 +485,9 @@ "delta-lake", "druid", "elasticsearch", - "feast" if sys.version_info >= (3, 8) else None, - "iceberg" if sys.version_info >= (3, 8) else None, - "mlflow" if sys.version_info >= (3, 8) else None, + "feast", + "iceberg", + "mlflow", "json-schema", "ldap", "looker", @@ -544,14 +540,14 @@ "clickhouse", "delta-lake", "druid", - "feast" if sys.version_info >= (3, 8) else None, + "feast", "hana", "hive", - "iceberg" if sys.version_info >= (3, 8) else None, + "iceberg", "kafka-connect", "ldap", "mongodb", - "mssql" if sys.version_info >= (3, 8) else None, + "mssql", "mysql", "mariadb", "redash", @@ -699,7 +695,6 @@ "Programming Language :: Python", "Programming Language :: Python :: 3", "Programming Language :: Python :: 3 :: Only", - "Programming Language :: Python :: 3.7", "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", @@ -716,7 +711,7 @@ ], # Package info. zip_safe=False, - python_requires=">=3.7", + python_requires=">=3.8", package_dir={"": "src"}, packages=setuptools.find_namespace_packages(where="./src"), package_data={ diff --git a/metadata-ingestion/src/datahub/__init__.py b/metadata-ingestion/src/datahub/__init__.py index a470de7b500be..b254deb7fa30e 100644 --- a/metadata-ingestion/src/datahub/__init__.py +++ b/metadata-ingestion/src/datahub/__init__.py @@ -16,16 +16,9 @@ def nice_version_name() -> str: return __version__ -if sys.version_info < (3, 7): +if sys.version_info < (3, 8): warnings.warn( - "DataHub requires Python 3.7 or newer. " - "Please upgrade your Python version to continue using DataHub.", - FutureWarning, - stacklevel=2, - ) -elif sys.version_info < (3, 8): - warnings.warn( - "DataHub will require Python 3.8 or newer soon. " + "DataHub requires Python 3.8 or newer. " "Please upgrade your Python version to continue using DataHub.", FutureWarning, stacklevel=2, diff --git a/metadata-ingestion/src/datahub/ingestion/api/report.py b/metadata-ingestion/src/datahub/ingestion/api/report.py index fcca767591774..08b20d9e85691 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/report.py +++ b/metadata-ingestion/src/datahub/ingestion/api/report.py @@ -2,11 +2,10 @@ import json import logging import pprint -import sys from dataclasses import dataclass from datetime import datetime, timedelta from enum import Enum -from typing import Any, Dict, Optional +from typing import Any, Optional import humanfriendly import pydantic @@ -19,12 +18,6 @@ logger = logging.getLogger(__name__) LogLevel = Literal["ERROR", "WARNING", "INFO", "DEBUG"] -# The sort_dicts option was added in Python 3.8. -if sys.version_info >= (3, 8): - PPRINT_OPTIONS = {"sort_dicts": False} -else: - PPRINT_OPTIONS: Dict = {} - @runtime_checkable class SupportsAsObj(Protocol): @@ -32,14 +25,6 @@ def as_obj(self) -> dict: ... -def _stacklevel_if_supported(level: int) -> dict: - # The logging module added support for stacklevel in Python 3.8. - if sys.version_info >= (3, 8): - return {"stacklevel": level} - else: - return {} - - @dataclass class Report(SupportsAsObj): @staticmethod @@ -95,7 +80,7 @@ def as_obj(self) -> dict: } def as_string(self) -> str: - return pprint.pformat(self.as_obj(), width=150, **PPRINT_OPTIONS) + return pprint.pformat(self.as_obj(), width=150, sort_dicts=False) def as_json(self) -> str: return json.dumps(self.as_obj()) @@ -118,7 +103,7 @@ def logger_sev(self) -> int: return log_levels[self.severity] def log(self, msg: str) -> None: - logger.log(level=self.logger_sev, msg=msg, **_stacklevel_if_supported(3)) + logger.log(level=self.logger_sev, msg=msg, stacklevel=3) class EntityFilterReport(ReportAttribute): diff --git a/metadata-ingestion/src/datahub/ingestion/source/feast.py b/metadata-ingestion/src/datahub/ingestion/source/feast.py index 8faba7d113372..db0c8e9c39e7b 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/feast.py +++ b/metadata-ingestion/src/datahub/ingestion/source/feast.py @@ -1,8 +1,3 @@ -import sys - -if sys.version_info < (3, 8): - raise ImportError("Feast is only supported on Python 3.8+") - from dataclasses import dataclass from typing import Dict, Iterable, List, Optional, Tuple, Union diff --git a/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg.py b/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg.py index cc7f646dcb884..2585260434a38 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg.py +++ b/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg.py @@ -1,8 +1,3 @@ -import sys - -if sys.version_info < (3, 8): - raise ImportError("Iceberg is only supported on Python 3.8+") - import json import logging import uuid diff --git a/metadata-ingestion/src/datahub/ingestion/source/mlflow.py b/metadata-ingestion/src/datahub/ingestion/source/mlflow.py index 0668defe7b0c6..cef6d2b1bb577 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/mlflow.py +++ b/metadata-ingestion/src/datahub/ingestion/source/mlflow.py @@ -1,9 +1,3 @@ -import sys - -if sys.version_info < (3, 8): - raise ImportError("MLflow is only supported on Python 3.8+") - - from dataclasses import dataclass from typing import Any, Callable, Iterable, Optional, TypeVar, Union diff --git a/metadata-ingestion/src/datahub/ingestion/source/schema_inference/object.py b/metadata-ingestion/src/datahub/ingestion/source/schema_inference/object.py index b58bdf41ccaa5..5a11d020547e8 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/schema_inference/object.py +++ b/metadata-ingestion/src/datahub/ingestion/source/schema_inference/object.py @@ -1,7 +1,7 @@ from collections import Counter from typing import Any, Counter as CounterType, Dict, Sequence, Tuple, Union -from mypy_extensions import TypedDict +from typing_extensions import TypedDict class BasicSchemaDescription(TypedDict): diff --git a/metadata-ingestion/tests/integration/feast/test_feast_repository.py b/metadata-ingestion/tests/integration/feast/test_feast_repository.py index eab37f67ed155..a6bdce6722289 100644 --- a/metadata-ingestion/tests/integration/feast/test_feast_repository.py +++ b/metadata-ingestion/tests/integration/feast/test_feast_repository.py @@ -1,6 +1,3 @@ -import sys - -import pytest from freezegun import freeze_time from datahub.ingestion.run.pipeline import Pipeline @@ -8,10 +5,6 @@ FROZEN_TIME = "2020-04-14 07:00:00" -pytestmark = pytest.mark.skipif( - sys.version_info < (3, 8), reason="requires python 3.8 or higher" -) - @freeze_time(FROZEN_TIME) def test_feast_repository_ingest(pytestconfig, tmp_path, mock_time): diff --git a/metadata-ingestion/tests/integration/iceberg/test_iceberg.py b/metadata-ingestion/tests/integration/iceberg/test_iceberg.py index 65ede11c3f1c0..a9ab43169405d 100644 --- a/metadata-ingestion/tests/integration/iceberg/test_iceberg.py +++ b/metadata-ingestion/tests/integration/iceberg/test_iceberg.py @@ -1,5 +1,4 @@ import subprocess -import sys from typing import Any, Dict, List from unittest.mock import patch @@ -15,13 +14,7 @@ validate_all_providers_have_committed_successfully, ) -pytestmark = [ - pytest.mark.integration_batch_1, - # Skip tests if not on Python 3.8 or higher. - pytest.mark.skipif( - sys.version_info < (3, 8), reason="Requires python 3.8 or higher" - ), -] +pytestmark = pytest.mark.integration_batch_1 FROZEN_TIME = "2020-04-14 07:00:00" GMS_PORT = 8080 GMS_SERVER = f"http://localhost:{GMS_PORT}" diff --git a/metadata-ingestion/tests/integration/mlflow/test_mlflow_source.py b/metadata-ingestion/tests/integration/mlflow/test_mlflow_source.py index 76af666526555..155199d5a04e9 100644 --- a/metadata-ingestion/tests/integration/mlflow/test_mlflow_source.py +++ b/metadata-ingestion/tests/integration/mlflow/test_mlflow_source.py @@ -1,104 +1,106 @@ -import sys +from pathlib import Path +from typing import Any, Dict, TypeVar -if sys.version_info >= (3, 8): - from pathlib import Path - from typing import Any, Dict, TypeVar +import pytest +from mlflow import MlflowClient - import pytest - from mlflow import MlflowClient +from datahub.ingestion.run.pipeline import Pipeline +from tests.test_helpers import mce_helpers - from datahub.ingestion.run.pipeline import Pipeline - from tests.test_helpers import mce_helpers +T = TypeVar("T") - T = TypeVar("T") - @pytest.fixture - def tracking_uri(tmp_path: Path) -> str: - return str(tmp_path / "mlruns") +@pytest.fixture +def tracking_uri(tmp_path: Path) -> str: + return str(tmp_path / "mlruns") - @pytest.fixture - def sink_file_path(tmp_path: Path) -> str: - return str(tmp_path / "mlflow_source_mcps.json") - @pytest.fixture - def pipeline_config(tracking_uri: str, sink_file_path: str) -> Dict[str, Any]: - source_type = "mlflow" - return { - "run_id": "mlflow-source-test", - "source": { - "type": source_type, - "config": { - "tracking_uri": tracking_uri, - }, +@pytest.fixture +def sink_file_path(tmp_path: Path) -> str: + return str(tmp_path / "mlflow_source_mcps.json") + + +@pytest.fixture +def pipeline_config(tracking_uri: str, sink_file_path: str) -> Dict[str, Any]: + source_type = "mlflow" + return { + "run_id": "mlflow-source-test", + "source": { + "type": source_type, + "config": { + "tracking_uri": tracking_uri, }, - "sink": { - "type": "file", - "config": { - "filename": sink_file_path, - }, + }, + "sink": { + "type": "file", + "config": { + "filename": sink_file_path, }, - } + }, + } + + +@pytest.fixture +def generate_mlflow_data(tracking_uri: str) -> None: + client = MlflowClient(tracking_uri=tracking_uri) + experiment_name = "test-experiment" + run_name = "test-run" + model_name = "test-model" + test_experiment_id = client.create_experiment(experiment_name) + test_run = client.create_run( + experiment_id=test_experiment_id, + run_name=run_name, + ) + client.log_param( + run_id=test_run.info.run_id, + key="p", + value=1, + ) + client.log_metric( + run_id=test_run.info.run_id, + key="m", + value=0.85, + ) + client.create_registered_model( + name=model_name, + tags=dict( + model_id=1, + model_env="test", + ), + description="This a test registered model", + ) + client.create_model_version( + name=model_name, + source="dummy_dir/dummy_file", + run_id=test_run.info.run_id, + tags=dict(model_version_id=1), + ) + client.transition_model_version_stage( + name=model_name, + version="1", + stage="Archived", + ) - @pytest.fixture - def generate_mlflow_data(tracking_uri: str) -> None: - client = MlflowClient(tracking_uri=tracking_uri) - experiment_name = "test-experiment" - run_name = "test-run" - model_name = "test-model" - test_experiment_id = client.create_experiment(experiment_name) - test_run = client.create_run( - experiment_id=test_experiment_id, - run_name=run_name, - ) - client.log_param( - run_id=test_run.info.run_id, - key="p", - value=1, - ) - client.log_metric( - run_id=test_run.info.run_id, - key="m", - value=0.85, - ) - client.create_registered_model( - name=model_name, - tags=dict( - model_id=1, - model_env="test", - ), - description="This a test registered model", - ) - client.create_model_version( - name=model_name, - source="dummy_dir/dummy_file", - run_id=test_run.info.run_id, - tags=dict(model_version_id=1), - ) - client.transition_model_version_stage( - name=model_name, - version="1", - stage="Archived", - ) - def test_ingestion( - pytestconfig, - mock_time, - sink_file_path, - pipeline_config, - generate_mlflow_data, - ): - print(f"MCPs file path: {sink_file_path}") - golden_file_path = ( - pytestconfig.rootpath / "tests/integration/mlflow/mlflow_mcps_golden.json" - ) +def test_ingestion( + pytestconfig, + mock_time, + sink_file_path, + pipeline_config, + generate_mlflow_data, +): + print(f"MCPs file path: {sink_file_path}") + golden_file_path = ( + pytestconfig.rootpath / "tests/integration/mlflow/mlflow_mcps_golden.json" + ) - pipeline = Pipeline.create(pipeline_config) - pipeline.run() - pipeline.pretty_print_summary() - pipeline.raise_from_status() + pipeline = Pipeline.create(pipeline_config) + pipeline.run() + pipeline.pretty_print_summary() + pipeline.raise_from_status() - mce_helpers.check_golden_file( - pytestconfig=pytestconfig, - output_path=sink_file_path, - golden_path=golden_file_path, - ) + mce_helpers.check_golden_file( + pytestconfig=pytestconfig, + output_path=sink_file_path, + golden_path=golden_file_path, + ) diff --git a/metadata-ingestion/tests/integration/sql_server/test_sql_server.py b/metadata-ingestion/tests/integration/sql_server/test_sql_server.py index 5ed672d527264..f439a322c2677 100644 --- a/metadata-ingestion/tests/integration/sql_server/test_sql_server.py +++ b/metadata-ingestion/tests/integration/sql_server/test_sql_server.py @@ -1,6 +1,5 @@ import os import subprocess -import sys import time import pytest @@ -9,10 +8,6 @@ from tests.test_helpers.click_helpers import run_datahub_cmd from tests.test_helpers.docker_helpers import cleanup_image, wait_for_port -pytestmark = pytest.mark.skipif( - sys.version_info < (3, 8), reason="requires python 3.8 or higher" -) - @pytest.fixture(scope="module") def mssql_runner(docker_compose_runner, pytestconfig): diff --git a/metadata-ingestion/tests/unit/test_iceberg.py b/metadata-ingestion/tests/unit/test_iceberg.py index 768d4f958af1f..e2b463004f5a1 100644 --- a/metadata-ingestion/tests/unit/test_iceberg.py +++ b/metadata-ingestion/tests/unit/test_iceberg.py @@ -1,482 +1,477 @@ -import sys import uuid from decimal import Decimal from typing import Any, Optional import pytest from pydantic import ValidationError +from pyiceberg.schema import Schema +from pyiceberg.types import ( + BinaryType, + BooleanType, + DateType, + DecimalType, + DoubleType, + FixedType, + FloatType, + IcebergType, + IntegerType, + ListType, + LongType, + MapType, + NestedField, + PrimitiveType, + StringType, + StructType, + TimestampType, + TimestamptzType, + TimeType, + UUIDType, +) + +from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.source.iceberg.iceberg import ( + IcebergProfiler, + IcebergSource, + IcebergSourceConfig, +) +from datahub.ingestion.source.iceberg.iceberg_common import IcebergCatalogConfig +from datahub.metadata.com.linkedin.pegasus2avro.schema import ArrayType, SchemaField +from datahub.metadata.schema_classes import ( + ArrayTypeClass, + BooleanTypeClass, + BytesTypeClass, + DateTypeClass, + FixedTypeClass, + NumberTypeClass, + RecordTypeClass, + StringTypeClass, + TimeTypeClass, +) -if sys.version_info >= (3, 8): - from pyiceberg.schema import Schema - from pyiceberg.types import ( - BinaryType, - BooleanType, - DateType, - DecimalType, - DoubleType, - FixedType, - FloatType, - IcebergType, - IntegerType, - ListType, - LongType, - MapType, - NestedField, - PrimitiveType, - StringType, - StructType, - TimestampType, - TimestamptzType, - TimeType, - UUIDType, - ) - from datahub.ingestion.api.common import PipelineContext - from datahub.ingestion.source.iceberg.iceberg import ( - IcebergProfiler, - IcebergSource, - IcebergSourceConfig, +def with_iceberg_source() -> IcebergSource: + catalog: IcebergCatalogConfig = IcebergCatalogConfig( + name="test", type="rest", config={} ) - from datahub.ingestion.source.iceberg.iceberg_common import IcebergCatalogConfig - from datahub.metadata.com.linkedin.pegasus2avro.schema import ArrayType, SchemaField - from datahub.metadata.schema_classes import ( - ArrayTypeClass, - BooleanTypeClass, - BytesTypeClass, - DateTypeClass, - FixedTypeClass, - NumberTypeClass, - RecordTypeClass, - StringTypeClass, - TimeTypeClass, + return IcebergSource( + ctx=PipelineContext(run_id="iceberg-source-test"), + config=IcebergSourceConfig(catalog=catalog), ) - pytestmark = pytest.mark.skipif( - sys.version_info < (3, 8), reason="requires python 3.8 or higher" + +def with_iceberg_profiler() -> IcebergProfiler: + iceberg_source_instance = with_iceberg_source() + return IcebergProfiler( + iceberg_source_instance.report, iceberg_source_instance.config.profiling ) - def with_iceberg_source() -> IcebergSource: - catalog: IcebergCatalogConfig = IcebergCatalogConfig( - name="test", type="rest", config={} - ) - return IcebergSource( - ctx=PipelineContext(run_id="iceberg-source-test"), - config=IcebergSourceConfig(catalog=catalog), - ) - def with_iceberg_profiler() -> IcebergProfiler: - iceberg_source_instance = with_iceberg_source() - return IcebergProfiler( - iceberg_source_instance.report, iceberg_source_instance.config.profiling - ) +def assert_field( + schema_field: SchemaField, + expected_description: Optional[str], + expected_nullable: bool, + expected_type: Any, +) -> None: + assert ( + schema_field.description == expected_description + ), f"Field description '{schema_field.description}' is different from expected description '{expected_description}'" + assert ( + schema_field.nullable == expected_nullable + ), f"Field nullable '{schema_field.nullable}' is different from expected nullable '{expected_nullable}'" + assert isinstance( + schema_field.type.type, expected_type + ), f"Field type {schema_field.type.type} is different from expected type {expected_type}" - def assert_field( - schema_field: SchemaField, - expected_description: Optional[str], - expected_nullable: bool, - expected_type: Any, - ) -> None: - assert ( - schema_field.description == expected_description - ), f"Field description '{schema_field.description}' is different from expected description '{expected_description}'" + +def test_config_no_catalog(): + """ + Test when no Iceberg catalog is provided. + """ + with pytest.raises(ValidationError, match="catalog"): + IcebergSourceConfig() # type: ignore + + +def test_config_catalog_not_configured(): + """ + Test when an Iceberg catalog is provided, but not properly configured. + """ + with pytest.raises(ValidationError): + IcebergCatalogConfig() # type: ignore + + with pytest.raises(ValidationError, match="conf"): + IcebergCatalogConfig(type="a type") # type: ignore + + with pytest.raises(ValidationError, match="type"): + IcebergCatalogConfig(conf={}) # type: ignore + + +def test_config_for_tests(): + """ + Test valid iceberg source that will be used in unit tests. + """ + with_iceberg_source() + + +@pytest.mark.parametrize( + "iceberg_type, expected_schema_field_type", + [ + (BinaryType(), BytesTypeClass), + (BooleanType(), BooleanTypeClass), + (DateType(), DateTypeClass), + ( + DecimalType(3, 2), + NumberTypeClass, + ), + (DoubleType(), NumberTypeClass), + (FixedType(4), FixedTypeClass), + (FloatType(), NumberTypeClass), + (IntegerType(), NumberTypeClass), + (LongType(), NumberTypeClass), + (StringType(), StringTypeClass), + ( + TimestampType(), + TimeTypeClass, + ), + ( + TimestamptzType(), + TimeTypeClass, + ), + (TimeType(), TimeTypeClass), + ( + UUIDType(), + StringTypeClass, + ), + ], +) +def test_iceberg_primitive_type_to_schema_field( + iceberg_type: PrimitiveType, expected_schema_field_type: Any +) -> None: + """ + Test converting a primitive typed Iceberg field to a SchemaField + """ + iceberg_source_instance = with_iceberg_source() + for column in [ + NestedField( + 1, "required_field", iceberg_type, True, "required field documentation" + ), + NestedField( + 1, "optional_field", iceberg_type, False, "optional field documentation" + ), + ]: + schema = Schema(column) + schema_fields = iceberg_source_instance._get_schema_fields_for_schema(schema) assert ( - schema_field.nullable == expected_nullable - ), f"Field nullable '{schema_field.nullable}' is different from expected nullable '{expected_nullable}'" - assert isinstance( - schema_field.type.type, expected_type - ), f"Field type {schema_field.type.type} is different from expected type {expected_type}" - - def test_config_no_catalog(): - """ - Test when no Iceberg catalog is provided. - """ - with pytest.raises(ValidationError, match="catalog"): - IcebergSourceConfig() # type: ignore - - def test_config_catalog_not_configured(): - """ - Test when an Iceberg catalog is provided, but not properly configured. - """ - with pytest.raises(ValidationError): - IcebergCatalogConfig() # type: ignore - - with pytest.raises(ValidationError, match="conf"): - IcebergCatalogConfig(type="a type") # type: ignore - - with pytest.raises(ValidationError, match="type"): - IcebergCatalogConfig(conf={}) # type: ignore - - def test_config_for_tests(): - """ - Test valid iceberg source that will be used in unit tests. - """ - with_iceberg_source() - - @pytest.mark.parametrize( - "iceberg_type, expected_schema_field_type", - [ - (BinaryType(), BytesTypeClass), - (BooleanType(), BooleanTypeClass), - (DateType(), DateTypeClass), - ( - DecimalType(3, 2), - NumberTypeClass, - ), - (DoubleType(), NumberTypeClass), - (FixedType(4), FixedTypeClass), - (FloatType(), NumberTypeClass), - (IntegerType(), NumberTypeClass), - (LongType(), NumberTypeClass), - (StringType(), StringTypeClass), - ( - TimestampType(), - TimeTypeClass, - ), - ( - TimestamptzType(), - TimeTypeClass, - ), - (TimeType(), TimeTypeClass), - ( - UUIDType(), - StringTypeClass, - ), - ], - ) - def test_iceberg_primitive_type_to_schema_field( - iceberg_type: PrimitiveType, expected_schema_field_type: Any - ) -> None: - """ - Test converting a primitive typed Iceberg field to a SchemaField - """ + len(schema_fields) == 1 + ), f"Expected 1 field, but got {len(schema_fields)}" + assert_field( + schema_fields[0], + column.doc, + column.optional, + expected_schema_field_type, + ) + + +@pytest.mark.parametrize( + "iceberg_type, expected_array_nested_type", + [ + (BinaryType(), "bytes"), + (BooleanType(), "boolean"), + (DateType(), "date"), + ( + DecimalType(3, 2), + "decimal", + ), + (DoubleType(), "double"), + (FixedType(4), "fixed"), + (FloatType(), "float"), + (IntegerType(), "int"), + (LongType(), "long"), + (StringType(), "string"), + ( + TimestampType(), + "timestamp-micros", + ), + ( + TimestamptzType(), + "timestamp-micros", + ), + (TimeType(), "time-micros"), + ( + UUIDType(), + "uuid", + ), + ], +) +def test_iceberg_list_to_schema_field( + iceberg_type: PrimitiveType, expected_array_nested_type: Any +) -> None: + """ + Test converting a list typed Iceberg field to an ArrayType SchemaField, including the list nested type. + """ + for list_column in [ + NestedField( + 1, + "listField", + ListType(2, iceberg_type, True), + True, + "required field, required element documentation", + ), + NestedField( + 1, + "listField", + ListType(2, iceberg_type, False), + True, + "required field, optional element documentation", + ), + NestedField( + 1, + "listField", + ListType(2, iceberg_type, True), + False, + "optional field, required element documentation", + ), + NestedField( + 1, + "listField", + ListType(2, iceberg_type, False), + False, + "optional field, optional element documentation", + ), + ]: iceberg_source_instance = with_iceberg_source() - for column in [ - NestedField( - 1, "required_field", iceberg_type, True, "required field documentation" - ), - NestedField( - 1, "optional_field", iceberg_type, False, "optional field documentation" - ), - ]: - schema = Schema(column) - schema_fields = iceberg_source_instance._get_schema_fields_for_schema( - schema - ) - assert ( - len(schema_fields) == 1 - ), f"Expected 1 field, but got {len(schema_fields)}" - assert_field( - schema_fields[0], - column.doc, - column.optional, - expected_schema_field_type, - ) - - @pytest.mark.parametrize( - "iceberg_type, expected_array_nested_type", - [ - (BinaryType(), "bytes"), - (BooleanType(), "boolean"), - (DateType(), "date"), - ( - DecimalType(3, 2), - "decimal", - ), - (DoubleType(), "double"), - (FixedType(4), "fixed"), - (FloatType(), "float"), - (IntegerType(), "int"), - (LongType(), "long"), - (StringType(), "string"), - ( - TimestampType(), - "timestamp-micros", - ), - ( - TimestamptzType(), - "timestamp-micros", - ), - (TimeType(), "time-micros"), - ( - UUIDType(), - "uuid", - ), - ], - ) - def test_iceberg_list_to_schema_field( - iceberg_type: PrimitiveType, expected_array_nested_type: Any - ) -> None: - """ - Test converting a list typed Iceberg field to an ArrayType SchemaField, including the list nested type. - """ - for list_column in [ - NestedField( - 1, - "listField", - ListType(2, iceberg_type, True), - True, - "required field, required element documentation", - ), - NestedField( - 1, - "listField", - ListType(2, iceberg_type, False), - True, - "required field, optional element documentation", - ), - NestedField( - 1, - "listField", - ListType(2, iceberg_type, True), - False, - "optional field, required element documentation", - ), - NestedField( - 1, - "listField", - ListType(2, iceberg_type, False), - False, - "optional field, optional element documentation", - ), - ]: - iceberg_source_instance = with_iceberg_source() - schema = Schema(list_column) - schema_fields = iceberg_source_instance._get_schema_fields_for_schema( - schema - ) - assert ( - len(schema_fields) == 1 - ), f"Expected 1 field, but got {len(schema_fields)}" - assert_field( - schema_fields[0], list_column.doc, list_column.optional, ArrayTypeClass - ) - assert isinstance( - schema_fields[0].type.type, ArrayType - ), f"Field type {schema_fields[0].type.type} was expected to be {ArrayType}" - arrayType: ArrayType = schema_fields[0].type.type - assert arrayType.nestedType == [ - expected_array_nested_type - ], f"List Field nested type {arrayType.nestedType} was expected to be {expected_array_nested_type}" - - @pytest.mark.parametrize( - "iceberg_type, expected_map_type", - [ - (BinaryType(), BytesTypeClass), - (BooleanType(), BooleanTypeClass), - (DateType(), DateTypeClass), - ( - DecimalType(3, 2), - NumberTypeClass, - ), - (DoubleType(), NumberTypeClass), - (FixedType(4), FixedTypeClass), - (FloatType(), NumberTypeClass), - (IntegerType(), NumberTypeClass), - (LongType(), NumberTypeClass), - (StringType(), StringTypeClass), - ( - TimestampType(), - TimeTypeClass, - ), - ( - TimestamptzType(), - TimeTypeClass, - ), - (TimeType(), TimeTypeClass), - ( - UUIDType(), - StringTypeClass, - ), - ], - ) - def test_iceberg_map_to_schema_field( - iceberg_type: PrimitiveType, expected_map_type: Any - ) -> None: - """ - Test converting a map typed Iceberg field to a MapType SchemaField, where the key is the same type as the value. - """ - for map_column in [ - NestedField( - 1, - "mapField", - MapType(11, iceberg_type, 12, iceberg_type, True), - True, - "required field, required value documentation", - ), - NestedField( - 1, - "mapField", - MapType(11, iceberg_type, 12, iceberg_type, False), - True, - "required field, optional value documentation", - ), - NestedField( - 1, - "mapField", - MapType(11, iceberg_type, 12, iceberg_type, True), - False, - "optional field, required value documentation", - ), - NestedField( - 1, - "mapField", - MapType(11, iceberg_type, 12, iceberg_type, False), - False, - "optional field, optional value documentation", - ), - ]: - iceberg_source_instance = with_iceberg_source() - schema = Schema(map_column) - schema_fields = iceberg_source_instance._get_schema_fields_for_schema( - schema - ) - # Converting an Iceberg Map type will be done by creating an array of struct(key, value) records. - # The first field will be the array. - assert ( - len(schema_fields) == 3 - ), f"Expected 3 fields, but got {len(schema_fields)}" - assert_field( - schema_fields[0], map_column.doc, map_column.optional, ArrayTypeClass - ) - - # The second field will be the key type - assert_field(schema_fields[1], None, False, expected_map_type) - - # The third field will be the value type - assert_field( - schema_fields[2], - None, - not map_column.field_type.value_required, - expected_map_type, - ) - - @pytest.mark.parametrize( - "iceberg_type, expected_schema_field_type", - [ - (BinaryType(), BytesTypeClass), - (BooleanType(), BooleanTypeClass), - (DateType(), DateTypeClass), - ( - DecimalType(3, 2), - NumberTypeClass, - ), - (DoubleType(), NumberTypeClass), - (FixedType(4), FixedTypeClass), - (FloatType(), NumberTypeClass), - (IntegerType(), NumberTypeClass), - (LongType(), NumberTypeClass), - (StringType(), StringTypeClass), - ( - TimestampType(), - TimeTypeClass, - ), - ( - TimestamptzType(), - TimeTypeClass, - ), - (TimeType(), TimeTypeClass), - ( - UUIDType(), - StringTypeClass, - ), - ], - ) - def test_iceberg_struct_to_schema_field( - iceberg_type: PrimitiveType, expected_schema_field_type: Any - ) -> None: - """ - Test converting a struct typed Iceberg field to a RecordType SchemaField. - """ - field1 = NestedField(11, "field1", iceberg_type, True, "field documentation") - struct_column = NestedField( - 1, "structField", StructType(field1), True, "struct documentation" + schema = Schema(list_column) + schema_fields = iceberg_source_instance._get_schema_fields_for_schema(schema) + assert ( + len(schema_fields) == 1 + ), f"Expected 1 field, but got {len(schema_fields)}" + assert_field( + schema_fields[0], list_column.doc, list_column.optional, ArrayTypeClass ) + assert isinstance( + schema_fields[0].type.type, ArrayType + ), f"Field type {schema_fields[0].type.type} was expected to be {ArrayType}" + arrayType: ArrayType = schema_fields[0].type.type + assert arrayType.nestedType == [ + expected_array_nested_type + ], f"List Field nested type {arrayType.nestedType} was expected to be {expected_array_nested_type}" + + +@pytest.mark.parametrize( + "iceberg_type, expected_map_type", + [ + (BinaryType(), BytesTypeClass), + (BooleanType(), BooleanTypeClass), + (DateType(), DateTypeClass), + ( + DecimalType(3, 2), + NumberTypeClass, + ), + (DoubleType(), NumberTypeClass), + (FixedType(4), FixedTypeClass), + (FloatType(), NumberTypeClass), + (IntegerType(), NumberTypeClass), + (LongType(), NumberTypeClass), + (StringType(), StringTypeClass), + ( + TimestampType(), + TimeTypeClass, + ), + ( + TimestamptzType(), + TimeTypeClass, + ), + (TimeType(), TimeTypeClass), + ( + UUIDType(), + StringTypeClass, + ), + ], +) +def test_iceberg_map_to_schema_field( + iceberg_type: PrimitiveType, expected_map_type: Any +) -> None: + """ + Test converting a map typed Iceberg field to a MapType SchemaField, where the key is the same type as the value. + """ + for map_column in [ + NestedField( + 1, + "mapField", + MapType(11, iceberg_type, 12, iceberg_type, True), + True, + "required field, required value documentation", + ), + NestedField( + 1, + "mapField", + MapType(11, iceberg_type, 12, iceberg_type, False), + True, + "required field, optional value documentation", + ), + NestedField( + 1, + "mapField", + MapType(11, iceberg_type, 12, iceberg_type, True), + False, + "optional field, required value documentation", + ), + NestedField( + 1, + "mapField", + MapType(11, iceberg_type, 12, iceberg_type, False), + False, + "optional field, optional value documentation", + ), + ]: iceberg_source_instance = with_iceberg_source() - schema = Schema(struct_column) + schema = Schema(map_column) schema_fields = iceberg_source_instance._get_schema_fields_for_schema(schema) + # Converting an Iceberg Map type will be done by creating an array of struct(key, value) records. + # The first field will be the array. assert ( - len(schema_fields) == 2 - ), f"Expected 2 fields, but got {len(schema_fields)}" + len(schema_fields) == 3 + ), f"Expected 3 fields, but got {len(schema_fields)}" assert_field( - schema_fields[0], struct_column.doc, struct_column.optional, RecordTypeClass + schema_fields[0], map_column.doc, map_column.optional, ArrayTypeClass ) + + # The second field will be the key type + assert_field(schema_fields[1], None, False, expected_map_type) + + # The third field will be the value type assert_field( - schema_fields[1], field1.doc, field1.optional, expected_schema_field_type + schema_fields[2], + None, + not map_column.field_type.value_required, + expected_map_type, ) - @pytest.mark.parametrize( - "value_type, value, expected_value", - [ - (BinaryType(), bytes([1, 2, 3, 4, 5]), "b'\\x01\\x02\\x03\\x04\\x05'"), - (BooleanType(), True, "True"), - (DateType(), 19543, "2023-07-05"), - (DecimalType(3, 2), Decimal((0, (3, 1, 4), -2)), "3.14"), - (DoubleType(), 3.4, "3.4"), - (FixedType(4), bytes([1, 2, 3, 4]), "b'\\x01\\x02\\x03\\x04'"), - (FloatType(), 3.4, "3.4"), - (IntegerType(), 3, "3"), - (LongType(), 4294967295000, "4294967295000"), - (StringType(), "a string", "a string"), - ( - TimestampType(), - 1688559488157000, - "2023-07-05T12:18:08.157000", - ), - ( - TimestamptzType(), - 1688559488157000, - "2023-07-05T12:18:08.157000+00:00", - ), - (TimeType(), 40400000000, "11:13:20"), - ( - UUIDType(), - uuid.UUID("00010203-0405-0607-0809-0a0b0c0d0e0f"), - "00010203-0405-0607-0809-0a0b0c0d0e0f", - ), - ], + +@pytest.mark.parametrize( + "iceberg_type, expected_schema_field_type", + [ + (BinaryType(), BytesTypeClass), + (BooleanType(), BooleanTypeClass), + (DateType(), DateTypeClass), + ( + DecimalType(3, 2), + NumberTypeClass, + ), + (DoubleType(), NumberTypeClass), + (FixedType(4), FixedTypeClass), + (FloatType(), NumberTypeClass), + (IntegerType(), NumberTypeClass), + (LongType(), NumberTypeClass), + (StringType(), StringTypeClass), + ( + TimestampType(), + TimeTypeClass, + ), + ( + TimestamptzType(), + TimeTypeClass, + ), + (TimeType(), TimeTypeClass), + ( + UUIDType(), + StringTypeClass, + ), + ], +) +def test_iceberg_struct_to_schema_field( + iceberg_type: PrimitiveType, expected_schema_field_type: Any +) -> None: + """ + Test converting a struct typed Iceberg field to a RecordType SchemaField. + """ + field1 = NestedField(11, "field1", iceberg_type, True, "field documentation") + struct_column = NestedField( + 1, "structField", StructType(field1), True, "struct documentation" + ) + iceberg_source_instance = with_iceberg_source() + schema = Schema(struct_column) + schema_fields = iceberg_source_instance._get_schema_fields_for_schema(schema) + assert len(schema_fields) == 2, f"Expected 2 fields, but got {len(schema_fields)}" + assert_field( + schema_fields[0], struct_column.doc, struct_column.optional, RecordTypeClass + ) + assert_field( + schema_fields[1], field1.doc, field1.optional, expected_schema_field_type ) - def test_iceberg_profiler_value_render( - value_type: IcebergType, value: Any, expected_value: Optional[str] - ) -> None: - iceberg_profiler_instance = with_iceberg_profiler() - assert ( - iceberg_profiler_instance._render_value("a.dataset", value_type, value) - == expected_value - ) - def test_avro_decimal_bytes_nullable() -> None: - """ - The following test exposes a problem with decimal (bytes) not preserving extra attributes like _nullable. Decimal (fixed) and Boolean for example do. - NOTE: This bug was by-passed by mapping the Decimal type to fixed instead of bytes. - """ - import avro.schema - - decimal_avro_schema_string = """{"type": "record", "name": "__struct_", "fields": [{"type": {"type": "bytes", "precision": 3, "scale": 2, "logicalType": "decimal", "native_data_type": "decimal(3, 2)", "_nullable": false}, "name": "required_field", "doc": "required field documentation"}]}""" - decimal_avro_schema = avro.schema.parse(decimal_avro_schema_string) - print("\nDecimal (bytes)") - print( - f"Original avro schema string: {decimal_avro_schema_string}" - ) - print( - f"After avro parsing, _nullable attribute is missing: {decimal_avro_schema}" - ) - decimal_fixed_avro_schema_string = """{"type": "record", "name": "__struct_", "fields": [{"type": {"type": "fixed", "logicalType": "decimal", "precision": 3, "scale": 2, "native_data_type": "decimal(3, 2)", "_nullable": false, "name": "bogusName", "size": 16}, "name": "required_field", "doc": "required field documentation"}]}""" - decimal_fixed_avro_schema = avro.schema.parse(decimal_fixed_avro_schema_string) - print("\nDecimal (fixed)") - print( - f"Original avro schema string: {decimal_fixed_avro_schema_string}" - ) - print( - f"After avro parsing, _nullable attribute is preserved: {decimal_fixed_avro_schema}" - ) +@pytest.mark.parametrize( + "value_type, value, expected_value", + [ + (BinaryType(), bytes([1, 2, 3, 4, 5]), "b'\\x01\\x02\\x03\\x04\\x05'"), + (BooleanType(), True, "True"), + (DateType(), 19543, "2023-07-05"), + (DecimalType(3, 2), Decimal((0, (3, 1, 4), -2)), "3.14"), + (DoubleType(), 3.4, "3.4"), + (FixedType(4), bytes([1, 2, 3, 4]), "b'\\x01\\x02\\x03\\x04'"), + (FloatType(), 3.4, "3.4"), + (IntegerType(), 3, "3"), + (LongType(), 4294967295000, "4294967295000"), + (StringType(), "a string", "a string"), + ( + TimestampType(), + 1688559488157000, + "2023-07-05T12:18:08.157000", + ), + ( + TimestamptzType(), + 1688559488157000, + "2023-07-05T12:18:08.157000+00:00", + ), + (TimeType(), 40400000000, "11:13:20"), + ( + UUIDType(), + uuid.UUID("00010203-0405-0607-0809-0a0b0c0d0e0f"), + "00010203-0405-0607-0809-0a0b0c0d0e0f", + ), + ], +) +def test_iceberg_profiler_value_render( + value_type: IcebergType, value: Any, expected_value: Optional[str] +) -> None: + iceberg_profiler_instance = with_iceberg_profiler() + assert ( + iceberg_profiler_instance._render_value("a.dataset", value_type, value) + == expected_value + ) - boolean_avro_schema_string = """{"type": "record", "name": "__struct_", "fields": [{"type": {"type": "boolean", "native_data_type": "boolean", "_nullable": false}, "name": "required_field", "doc": "required field documentation"}]}""" - boolean_avro_schema = avro.schema.parse(boolean_avro_schema_string) - print("\nBoolean") - print( - f"Original avro schema string: {boolean_avro_schema_string}" - ) - print( - f"After avro parsing, _nullable attribute is preserved: {boolean_avro_schema}" - ) + +def test_avro_decimal_bytes_nullable() -> None: + """ + The following test exposes a problem with decimal (bytes) not preserving extra attributes like _nullable. Decimal (fixed) and Boolean for example do. + NOTE: This bug was by-passed by mapping the Decimal type to fixed instead of bytes. + """ + import avro.schema + + decimal_avro_schema_string = """{"type": "record", "name": "__struct_", "fields": [{"type": {"type": "bytes", "precision": 3, "scale": 2, "logicalType": "decimal", "native_data_type": "decimal(3, 2)", "_nullable": false}, "name": "required_field", "doc": "required field documentation"}]}""" + decimal_avro_schema = avro.schema.parse(decimal_avro_schema_string) + print("\nDecimal (bytes)") + print( + f"Original avro schema string: {decimal_avro_schema_string}" + ) + print(f"After avro parsing, _nullable attribute is missing: {decimal_avro_schema}") + + decimal_fixed_avro_schema_string = """{"type": "record", "name": "__struct_", "fields": [{"type": {"type": "fixed", "logicalType": "decimal", "precision": 3, "scale": 2, "native_data_type": "decimal(3, 2)", "_nullable": false, "name": "bogusName", "size": 16}, "name": "required_field", "doc": "required field documentation"}]}""" + decimal_fixed_avro_schema = avro.schema.parse(decimal_fixed_avro_schema_string) + print("\nDecimal (fixed)") + print( + f"Original avro schema string: {decimal_fixed_avro_schema_string}" + ) + print( + f"After avro parsing, _nullable attribute is preserved: {decimal_fixed_avro_schema}" + ) + + boolean_avro_schema_string = """{"type": "record", "name": "__struct_", "fields": [{"type": {"type": "boolean", "native_data_type": "boolean", "_nullable": false}, "name": "required_field", "doc": "required field documentation"}]}""" + boolean_avro_schema = avro.schema.parse(boolean_avro_schema_string) + print("\nBoolean") + print( + f"Original avro schema string: {boolean_avro_schema_string}" + ) + print( + f"After avro parsing, _nullable attribute is preserved: {boolean_avro_schema}" + ) diff --git a/metadata-ingestion/tests/unit/test_mlflow_source.py b/metadata-ingestion/tests/unit/test_mlflow_source.py index 97b5afd3d6a4e..374816055b216 100644 --- a/metadata-ingestion/tests/unit/test_mlflow_source.py +++ b/metadata-ingestion/tests/unit/test_mlflow_source.py @@ -1,133 +1,140 @@ -import sys +import datetime +from pathlib import Path +from typing import Any, TypeVar, Union + +import pytest +from mlflow import MlflowClient +from mlflow.entities.model_registry import RegisteredModel +from mlflow.entities.model_registry.model_version import ModelVersion +from mlflow.store.entities import PagedList + +from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.source.mlflow import MLflowConfig, MLflowSource + +T = TypeVar("T") + + +@pytest.fixture +def tracking_uri(tmp_path: Path) -> str: + return str(tmp_path / "mlruns") + + +@pytest.fixture +def source(tracking_uri: str) -> MLflowSource: + return MLflowSource( + ctx=PipelineContext(run_id="mlflow-source-test"), + config=MLflowConfig(tracking_uri=tracking_uri), + ) + + +@pytest.fixture +def registered_model(source: MLflowSource) -> RegisteredModel: + model_name = "abc" + return RegisteredModel(name=model_name) + + +@pytest.fixture +def model_version( + source: MLflowSource, + registered_model: RegisteredModel, +) -> ModelVersion: + version = "1" + return ModelVersion( + name=registered_model.name, + version=version, + creation_timestamp=datetime.datetime.now(), + ) + + +def dummy_search_func(page_token: Union[None, str], **kwargs: Any) -> PagedList[T]: + dummy_pages = dict( + page_1=PagedList(items=["a", "b"], token="page_2"), + page_2=PagedList(items=["c", "d"], token="page_3"), + page_3=PagedList(items=["e"], token=None), + ) + if page_token is None: + page_to_return = dummy_pages["page_1"] + else: + page_to_return = dummy_pages[page_token] + if kwargs.get("case", "") == "upper": + page_to_return = PagedList( + items=[e.upper() for e in page_to_return.to_list()], + token=page_to_return.token, + ) + return page_to_return -if sys.version_info >= (3, 8): - import datetime - from pathlib import Path - from typing import Any, TypeVar, Union - import pytest - from mlflow import MlflowClient - from mlflow.entities.model_registry import RegisteredModel - from mlflow.entities.model_registry.model_version import ModelVersion - from mlflow.store.entities import PagedList +def test_stages(source): + mlflow_registered_model_stages = { + "Production", + "Staging", + "Archived", + None, + } + workunits = source._get_tags_workunits() + names = [wu.get_metadata()["metadata"].aspect.name for wu in workunits] - from datahub.ingestion.api.common import PipelineContext - from datahub.ingestion.source.mlflow import MLflowConfig, MLflowSource + assert len(names) == len(mlflow_registered_model_stages) + assert set(names) == { + "mlflow_" + str(stage).lower() for stage in mlflow_registered_model_stages + } - T = TypeVar("T") - @pytest.fixture - def tracking_uri(tmp_path: Path) -> str: - return str(tmp_path / "mlruns") +def test_config_model_name_separator(source, model_version): + name_version_sep = "+" + source.config.model_name_separator = name_version_sep + expected_model_name = ( + f"{model_version.name}{name_version_sep}{model_version.version}" + ) + expected_urn = f"urn:li:mlModel:(urn:li:dataPlatform:mlflow,{expected_model_name},{source.config.env})" - @pytest.fixture - def source(tracking_uri: str) -> MLflowSource: - return MLflowSource( - ctx=PipelineContext(run_id="mlflow-source-test"), - config=MLflowConfig(tracking_uri=tracking_uri), - ) + urn = source._make_ml_model_urn(model_version) - @pytest.fixture - def registered_model(source: MLflowSource) -> RegisteredModel: - model_name = "abc" - return RegisteredModel(name=model_name) - - @pytest.fixture - def model_version( - source: MLflowSource, - registered_model: RegisteredModel, - ) -> ModelVersion: - version = "1" - return ModelVersion( - name=registered_model.name, - version=version, - creation_timestamp=datetime.datetime.now(), - ) + assert urn == expected_urn - def dummy_search_func(page_token: Union[None, str], **kwargs: Any) -> PagedList[T]: - dummy_pages = dict( - page_1=PagedList(items=["a", "b"], token="page_2"), - page_2=PagedList(items=["c", "d"], token="page_3"), - page_3=PagedList(items=["e"], token=None), - ) - if page_token is None: - page_to_return = dummy_pages["page_1"] - else: - page_to_return = dummy_pages[page_token] - if kwargs.get("case", "") == "upper": - page_to_return = PagedList( - items=[e.upper() for e in page_to_return.to_list()], - token=page_to_return.token, - ) - return page_to_return - - def test_stages(source): - mlflow_registered_model_stages = { - "Production", - "Staging", - "Archived", - None, - } - workunits = source._get_tags_workunits() - names = [wu.get_metadata()["metadata"].aspect.name for wu in workunits] - - assert len(names) == len(mlflow_registered_model_stages) - assert set(names) == { - "mlflow_" + str(stage).lower() for stage in mlflow_registered_model_stages - } - - def test_config_model_name_separator(source, model_version): - name_version_sep = "+" - source.config.model_name_separator = name_version_sep - expected_model_name = ( - f"{model_version.name}{name_version_sep}{model_version.version}" - ) - expected_urn = f"urn:li:mlModel:(urn:li:dataPlatform:mlflow,{expected_model_name},{source.config.env})" - urn = source._make_ml_model_urn(model_version) +def test_model_without_run(source, registered_model, model_version): + run = source._get_mlflow_run(model_version) + wu = source._get_ml_model_properties_workunit( + registered_model=registered_model, + model_version=model_version, + run=run, + ) + aspect = wu.get_metadata()["metadata"].aspect - assert urn == expected_urn + assert aspect.hyperParams is None + assert aspect.trainingMetrics is None - def test_model_without_run(source, registered_model, model_version): - run = source._get_mlflow_run(model_version) - wu = source._get_ml_model_properties_workunit( - registered_model=registered_model, - model_version=model_version, - run=run, - ) - aspect = wu.get_metadata()["metadata"].aspect - assert aspect.hyperParams is None - assert aspect.trainingMetrics is None +def test_traverse_mlflow_search_func(source): + expected_items = ["a", "b", "c", "d", "e"] - def test_traverse_mlflow_search_func(source): - expected_items = ["a", "b", "c", "d", "e"] + items = list(source._traverse_mlflow_search_func(dummy_search_func)) - items = list(source._traverse_mlflow_search_func(dummy_search_func)) + assert items == expected_items - assert items == expected_items - def test_traverse_mlflow_search_func_with_kwargs(source): - expected_items = ["A", "B", "C", "D", "E"] +def test_traverse_mlflow_search_func_with_kwargs(source): + expected_items = ["A", "B", "C", "D", "E"] + + items = list(source._traverse_mlflow_search_func(dummy_search_func, case="upper")) + + assert items == expected_items - items = list( - source._traverse_mlflow_search_func(dummy_search_func, case="upper") - ) - assert items == expected_items +def test_make_external_link_local(source, model_version): + expected_url = None - def test_make_external_link_local(source, model_version): - expected_url = None + url = source._make_external_url(model_version) - url = source._make_external_url(model_version) + assert url == expected_url - assert url == expected_url - def test_make_external_link_remote(source, model_version): - tracking_uri_remote = "https://dummy-mlflow-tracking-server.org" - source.client = MlflowClient(tracking_uri=tracking_uri_remote) - expected_url = f"{tracking_uri_remote}/#/models/{model_version.name}/versions/{model_version.version}" +def test_make_external_link_remote(source, model_version): + tracking_uri_remote = "https://dummy-mlflow-tracking-server.org" + source.client = MlflowClient(tracking_uri=tracking_uri_remote) + expected_url = f"{tracking_uri_remote}/#/models/{model_version.name}/versions/{model_version.version}" - url = source._make_external_url(model_version) + url = source._make_external_url(model_version) - assert url == expected_url + assert url == expected_url