-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(ingestion/prefect-plugin): fixed the unit tests #10643
Changes from 32 commits
99aae38
d64102d
9c71601
a876e87
9a72681
95347e1
68c6644
6dc9da9
4e32e0e
9290837
aaad752
9884785
08bc3e0
41072df
7662b58
33a42a2
adb8adb
870d5ee
70b1bf2
9c80098
5556927
f89d6c0
04881b1
54c012d
3839f2e
eae4268
24ccf17
8bb5e8d
5d54ff9
2638848
654da76
9b2a558
5b6c2c8
014a340
edffb1d
297fa62
889e4fb
2e1d64c
9e8f2a1
6a701c9
8b9548f
0422477
c134e97
0bef326
c096add
f2a05a9
706193e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,86 @@ | ||
name: Prefect Plugin | ||
on: | ||
push: | ||
branches: | ||
- master | ||
paths: | ||
- ".github/workflows/prefect-plugin.yml" | ||
- "metadata-ingestion-modules/prefect-plugin/**" | ||
- "metadata-ingestion/**" | ||
- "metadata-models/**" | ||
pull_request: | ||
branches: | ||
- "**" | ||
paths: | ||
- ".github/workflows/prefect-plugin.yml" | ||
- "metadata-ingestion-modules/prefect-plugin/**" | ||
- "metadata-ingestion/**" | ||
- "metadata-models/**" | ||
release: | ||
types: [published] | ||
|
||
concurrency: | ||
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} | ||
cancel-in-progress: true | ||
|
||
jobs: | ||
prefect-plugin: | ||
runs-on: ubuntu-latest | ||
env: | ||
SPARK_VERSION: 3.0.3 | ||
DATAHUB_TELEMETRY_ENABLED: false | ||
strategy: | ||
matrix: | ||
python-version: ["3.8", "3.9", "3.10"] | ||
include: | ||
- python-version: "3.8" | ||
- python-version: "3.9" | ||
- python-version: "3.10" | ||
fail-fast: false | ||
steps: | ||
- name: Set up JDK 17 | ||
uses: actions/setup-java@v3 | ||
with: | ||
distribution: "zulu" | ||
java-version: 17 | ||
- uses: gradle/gradle-build-action@v2 | ||
- uses: actions/checkout@v3 | ||
- uses: actions/setup-python@v4 | ||
with: | ||
python-version: ${{ matrix.python-version }} | ||
cache: "pip" | ||
- name: Install dependencies | ||
run: ./metadata-ingestion/scripts/install_deps.sh | ||
- name: Install prefect package | ||
run: ./gradlew :metadata-ingestion-modules:prefect-plugin:lint :metadata-ingestion-modules:prefect-plugin:testQuick | ||
- name: pip freeze show list installed | ||
if: always() | ||
run: source metadata-ingestion-modules/prefect-plugin/venv/bin/activate && pip freeze | ||
- uses: actions/upload-artifact@v3 | ||
if: ${{ always() && matrix.python-version == '3.10'}} | ||
with: | ||
name: Test Results (Prefect Plugin ${{ matrix.python-version}}) | ||
path: | | ||
**/build/reports/tests/test/** | ||
**/build/test-results/test/** | ||
**/junit.*.xml | ||
!**/binary/** | ||
- name: Upload coverage to Codecov | ||
if: always() | ||
uses: codecov/codecov-action@v3 | ||
with: | ||
token: ${{ secrets.CODECOV_TOKEN }} | ||
directory: . | ||
fail_ci_if_error: false | ||
flags: prefect,prefect-${{ matrix.extra_pip_extras }} | ||
name: pytest-prefect-${{ matrix.python-version }} | ||
verbose: true | ||
|
||
event-file: | ||
runs-on: ubuntu-latest | ||
steps: | ||
- name: Upload | ||
uses: actions/upload-artifact@v3 | ||
with: | ||
name: Event File | ||
path: ${{ github.event_path }} |
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,135 @@ | ||||||||||
# Prefect Integration | ||||||||||
|
||||||||||
DataHub supports integration of | ||||||||||
|
||||||||||
- Prefect flow and task metadata | ||||||||||
- Flow run and Task run information as well as | ||||||||||
- Lineage information when present | ||||||||||
|
||||||||||
## What is Prefect Datahub Block? | ||||||||||
|
||||||||||
Blocks are primitive within Prefect that enable the storage of configuration and provide an interface for interacting with external systems. We integrated `prefect-datahub` block which use [Datahub Rest](../../metadata-ingestion/sink_docs/datahub.md#datahub-rest) emitter to emit metadata events while running prefect flow. | ||||||||||
|
||||||||||
## Prerequisites to use Prefect Datahub Block | ||||||||||
|
||||||||||
1. You need to use either Prefect Cloud (recommended) or the self hosted Prefect server. | ||||||||||
2. Refer [Cloud Quickstart](https://docs.prefect.io/latest/getting-started/quickstart/) to setup Prefect Cloud. | ||||||||||
3. Refer [Host Prefect server](https://docs.prefect.io/latest/guides/host/) to setup self hosted Prefect server. | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Correct spelling: "self-hosted" The term "self hosted" should be hyphenated. - setup self hosted Prefect server.
+ setup self-hosted Prefect server. Committable suggestion
Suggested change
ToolsLanguageTool
|
||||||||||
4. Make sure the Prefect api url is set correctly. You can check it by running below command: | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add missing article Add "the" before "below command". - running below command:
+ running the below command: Committable suggestion
Suggested change
ToolsLanguageTool
Markdownlint
|
||||||||||
```shell | ||||||||||
prefect profile inspect | ||||||||||
``` | ||||||||||
5. If you are using Prefect Cloud, the API URL should be set as `https://api.prefect.cloud/api/accounts/<account_id>/workspaces/<workspace_id>`. | ||||||||||
6. If you are using a self-hosted Prefect server, the API URL should be set as `http://<host>:<port>/api`. | ||||||||||
|
||||||||||
## Setup | ||||||||||
|
||||||||||
### Installation | ||||||||||
|
||||||||||
Install `prefect-datahub` with `pip`: | ||||||||||
|
||||||||||
```shell | ||||||||||
pip install 'prefect-datahub' | ||||||||||
``` | ||||||||||
|
||||||||||
Requires an installation of Python 3.7+. | ||||||||||
|
||||||||||
### Saving configurations to a block | ||||||||||
|
||||||||||
This is a one-time activity, where you can save the configuration on the [Prefect block document store](https://docs.prefect.io/latest/concepts/blocks/#saving-blocks). | ||||||||||
While saving you can provide below configurations. Default value will get set if not provided while saving the configuration to block. | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add a comma after the introductory phrase. A comma is needed after "While saving". - While saving you can provide below configurations.
+ While saving, you can provide below configurations. Committable suggestion
Suggested change
ToolsLanguageTool
|
||||||||||
|
||||||||||
Config | Type | Default | Description | ||||||||||
--- | --- | --- | --- | ||||||||||
datahub_rest_url | `str` | *http://localhost:8080* | DataHub GMS REST URL | ||||||||||
env | `str` | *PROD* | The environment that all assets produced by this orchestrator belong to. For more detail and possible values refer [here](https://datahubproject.io/docs/graphql/enums/#fabrictype). | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add comma after introductory phrase A comma is needed after "For more detail". - For more detail and possible values refer [here]
+ For more detail and possible values, refer [here] Committable suggestion
Suggested change
ToolsLanguageTool
|
||||||||||
platform_instance | `str` | *None* | The instance of the platform that all assets produced by this recipe belong to. For more detail please refer [here](https://datahubproject.io/docs/platform-instances/). | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add comma after introductory phrase A comma is needed after "For more detail". - For more detail please refer [here]
+ For more detail, please refer [here] Committable suggestion
Suggested change
ToolsLanguageTool
|
||||||||||
|
||||||||||
```python | ||||||||||
from prefect_datahub.datahub_emitter import DatahubEmitter | ||||||||||
DatahubEmitter( | ||||||||||
datahub_rest_url="http://localhost:8080", | ||||||||||
env="PROD", | ||||||||||
platform_instance="local_prefect" | ||||||||||
).save("BLOCK-NAME-PLACEHOLDER") | ||||||||||
``` | ||||||||||
|
||||||||||
Congrats! You can now load the saved block to use your configurations in your Flow code: | ||||||||||
|
||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove trailing space There is a trailing space at the end of this line. - Congrats! You can now load the saved block to use your configurations in your Flow code:
+ Congrats! You can now load the saved block to use your configurations in your Flow code:
ToolsMarkdownlint
|
||||||||||
```python | ||||||||||
from prefect_datahub.datahub_emitter import DatahubEmitter | ||||||||||
DatahubEmitter.load("BLOCK-NAME-PLACEHOLDER") | ||||||||||
``` | ||||||||||
|
||||||||||
!!! info "Registering blocks" | ||||||||||
|
||||||||||
Register blocks in this module to | ||||||||||
[view and edit them](https://docs.prefect.io/ui/blocks/) | ||||||||||
on Prefect Cloud: | ||||||||||
|
||||||||||
```bash | ||||||||||
prefect block register -m prefect_datahub | ||||||||||
``` | ||||||||||
|
||||||||||
### Load the saved block in prefect workflows | ||||||||||
|
||||||||||
After installing `prefect-datahub` and [saving the configution](#saving-configurations-to-a-block), you can easily use it within your prefect workflows to help you emit metadata event as show below! | ||||||||||
|
||||||||||
```python | ||||||||||
from prefect import flow, task | ||||||||||
from prefect_datahub.dataset import Dataset | ||||||||||
from prefect_datahub.datahub_emitter import DatahubEmitter | ||||||||||
|
||||||||||
datahub_emitter = DatahubEmitter.load("MY_BLOCK_NAME") | ||||||||||
|
||||||||||
@task(name="Transform", description="Transform the data") | ||||||||||
def transform(data): | ||||||||||
data = data.split(" ") | ||||||||||
datahub_emitter.add_task( | ||||||||||
inputs=[Dataset("snowflake", "mydb.schema.tableA")], | ||||||||||
outputs=[Dataset("snowflake", "mydb.schema.tableC")], | ||||||||||
) | ||||||||||
return data | ||||||||||
|
||||||||||
@flow(name="ETL flow", description="Extract transform load flow") | ||||||||||
def etl(): | ||||||||||
data = transform("This is data") | ||||||||||
datahub_emitter.emit_flow() | ||||||||||
``` | ||||||||||
|
||||||||||
**Note**: To emit the tasks, user compulsory need to emit flow. Otherwise nothing will get emit. | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add a comma after "Otherwise" A comma is needed after "Otherwise". - Otherwise nothing will get emit.
+ Otherwise, nothing will get emit. Committable suggestion
Suggested change
ToolsLanguageTool
|
||||||||||
|
||||||||||
## Concept mapping | ||||||||||
|
||||||||||
Prefect concepts are documented [here](https://docs.prefect.io/latest/concepts/), and datahub concepts are documented [here](https://datahubproject.io/docs/what-is-datahub/datahub-concepts). | ||||||||||
|
||||||||||
Prefect Concept | DataHub Concept | ||||||||||
--- | --- | ||||||||||
[Flow](https://docs.prefect.io/latest/concepts/flows/) | [DataFlow](https://datahubproject.io/docs/generated/metamodel/entities/dataflow/) | ||||||||||
[Flow Run](https://docs.prefect.io/latest/concepts/flows/#flow-runs) | [DataProcessInstance](https://datahubproject.io/docs/generated/metamodel/entities/dataprocessinstance) | ||||||||||
[Task](https://docs.prefect.io/latest/concepts/tasks/) | [DataJob](https://datahubproject.io/docs/generated/metamodel/entities/datajob/) | ||||||||||
[Task Run](https://docs.prefect.io/latest/concepts/tasks/#tasks) | [DataProcessInstance](https://datahubproject.io/docs/generated/metamodel/entities/dataprocessinstance) | ||||||||||
[Task Tag](https://docs.prefect.io/latest/concepts/tasks/#tags) | [Tag](https://datahubproject.io/docs/generated/metamodel/entities/tag/) | ||||||||||
|
||||||||||
|
||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove extra blank line There are two consecutive blank lines here. -
+
ToolsMarkdownlint
|
||||||||||
## How to validate saved block and emit of metadata | ||||||||||
|
||||||||||
1. Go and check in Prefect UI at the Blocks menu if you can see the datahub emitter. | ||||||||||
2. Run a Prefect workflow. In the flow logs, you should see Datahub related log messages like: | ||||||||||
|
||||||||||
``` | ||||||||||
Emitting flow to datahub... | ||||||||||
Emitting tasks to datahub... | ||||||||||
``` | ||||||||||
## Debugging | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add blank lines around heading Headings should be surrounded by blank lines. - ## Debugging
+
+ ## Debugging
+ Committable suggestion
Suggested change
ToolsMarkdownlint
|
||||||||||
|
||||||||||
### Incorrect Prefect API URL | ||||||||||
|
||||||||||
If your Prefect API URL aren't being generated correctly or set incorrectly, then in that case you can set the Prefect API URL manually as show below: | ||||||||||
|
||||||||||
```shell | ||||||||||
prefect config set PREFECT_API_URL='http://127.0.0.1:4200/api' | ||||||||||
``` | ||||||||||
|
||||||||||
### Connection error for Datahub Rest URL | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add blank lines around heading. Headings should be surrounded by blank lines. - ### Connection error for Datahub Rest URL
+
+ ### Connection error for Datahub Rest URL
+ Committable suggestion
Suggested change
ToolsMarkdownlint
|
||||||||||
If you get ConnectionError: HTTPConnectionPool(host='localhost', port=8080), then in that case your GMS service is not up. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct spelling: "self-hosted"
The term "self hosted" should be hyphenated.
Committable suggestion
Tools
LanguageTool