Skip to content

Commit

Permalink
Merge branch 'main' of github.com:astronomer/Astro-sdk into airflow-p…
Browse files Browse the repository at this point in the history
…rovider-rc-testing

taking latest
  • Loading branch information
vatsrahul1001 committed Sep 15, 2023
2 parents 641cda3 + 0fe2054 commit dfafcb5
Show file tree
Hide file tree
Showing 12 changed files with 209 additions and 218 deletions.
201 changes: 12 additions & 189 deletions .github/workflows/ci-rc-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,192 +30,15 @@ defaults:
working-directory: python-sdk

jobs:
check-rc-testing-announcement:
runs-on: 'ubuntu-20.04'
if: github.event_name == 'schedule'
env:
GH_TOKEN: ${{ github.token }}
steps:
- name: Checkout apache-airflow
uses: actions/checkout@v3
with:
repository: 'apache/airflow'

- name: Parse the latest 100 GitHub issues from apache-airflow to check providers testing announcement
id: parse-airflow-gh-issues
run: |
# The default limit is 30. Set it to 100 for retrieving more issues
rc_issue_url=`gh issue list \
--json createdAt,title,url \
--limit 100 \
--jq 'map(
select(
.title |
contains ("Status of testing Providers that were prepared on ")
)
) | .[0].url'`
echo "rc_issue_url=$rc_issue_url" >> $GITHUB_OUTPUT
- name: Checkout current repo
uses: actions/checkout@v3
if: steps.parse-airflow-gh-issues.outputs.rc_issue_url != ''

- name: Parse the latest GitHub pull requests for checking existing RC provider testing pull request
id: parse-current-repo
if: steps.parse-airflow-gh-issues.outputs.rc_issue_url != ''
run: |
# The default limit is 30. Set it to 100 for retrieving more pull requests
rc_issue_url="${{ steps.parse-airflow-gh-issues.outputs.rc_issue_url }}"
jq_query="map(
select(.title == \"[DO NOT MERGE] Test RC provider packages for $rc_issue_url\")
) | .[0].url"
testing_pr_url=`gh pr list \
--json createdAt,title,url \
--limit 100 \
--state all \
--jq "$jq_query"`
echo "testing_pr_url=$testing_pr_url" >> $GITHUB_OUTPUT
- name: Export rc_issue_url
id: export-rc-issue-url
run: |
rc_issue_url="${{ steps.parse-airflow-gh-issues.outputs.rc_issue_url }}"
testing_pr_url="${{ steps.parse-current-repo.outputs.testing_pr_url }}"
if [ "$rc_issue_url" == "" ] ; then
echo "No RC providers testing announcement found on apache-airflow"
elif [ "$testing_pr_url" != "" ] ; then
echo "Branch for testing RC providers has been created"
rc_issue_url=""
fi
echo "rc_issue_url=$rc_issue_url" >> $GITHUB_OUTPUT
outputs:
rc_issue_url: ${{ steps.export-rc-issue-url.outputs.rc_issue_url }}

validate-manual-input:
runs-on: 'ubuntu-20.04'
if: github.event_name == 'workflow_dispatch'
steps:
- name: Validate user input
if: |
(inputs.rc_testing_branch == '' && inputs.issue_url == '') ||
(inputs.rc_testing_branch != '' && inputs.issue_url != '')
run: |
echo "Either rc_testing_branch or issue_url is required, and you cannot give both."
exit 1
create-branch-for-testing-rc-release:
needs: [validate-manual-input, check-rc-testing-announcement]
runs-on: 'ubuntu-20.04'
if: |
always() &&
(
(github.event_name == 'workflow_dispatch' && inputs.issue_url != '') ||
(github.event_name == 'schedule' && needs.check-rc-testing-announcement.outputs.rc_issue_url != '')
)
steps:
- name: Checkout
uses: actions/checkout@v3
with:
ref: ${{ inputs.base_git_rev }}

- name: Install dev dependency
run: |
python3 -m pip install -r dev/integration_test_scripts/requirements.txt
- name: Setup Github Actions git user
run: |
git config --global user.email "[email protected]"
git config --global user.name "GitHub Actions"
- name: Export GitHub RC provider testing url
id: export-rc-issue-url
run: |
if [ "${{ inputs.issue_url }}" != "" ] ; then
rc_issue_url="${{ inputs.issue_url }}"
else
rc_issue_url="${{ needs.check-rc-testing-announcement.outputs.rc_issue_url }}"
fi
echo "rc_issue_url=$rc_issue_url"
echo "rc_issue_url=$rc_issue_url" >> $GITHUB_OUTPUT
- name: Update project dependencies to use RC providers
run: |
rc_issue_url="${{ steps.export-rc-issue-url.outputs.rc_issue_url }}"
echo "Updating pyproject.toml with RC provider packages on $rc_issue_url"
python3 dev/integration_test_scripts/replace_dependencies.py --issue-url $rc_issue_url
- name: Check repo providers updated
id: check-repo-provideres-updated
run: |
difference=`git diff`
if [ -z "$difference" ]
then
echo "No provider changed"
echo "no_prvider_changed=true" >> $GITHUB_OUTPUT
else
echo "$difference"
fi
- name: Create RC branch
id: create_rc_branch
run: |
if [ "${{ steps.check-repo-provideres-updated.outputs.no_prvider_changed }}" != "true" ]
then
current_timestamp=`date +%Y-%m-%dT%H-%M-%S%Z`
echo "Current timestamp is" $current_timestamp
branch_name="rc-test-$current_timestamp"
git checkout -b $branch_name
else
branch_name=""
fi
echo "rc_testing_branch=$branch_name"
echo "rc_testing_branch=$branch_name" >> $GITHUB_OUTPUT
- name: Commit changes and create a pull request
if: steps.create_rc_branch.outputs.rc_testing_branch != ''
env:
GH_TOKEN: ${{ github.token }}
run: |
rc_issue_url="${{ steps.export-rc-issue-url.outputs.rc_issue_url }}"
git add pyproject.toml
git commit -m "Updating pyproject.toml with RC provider packages on $rc_issue_url"
git push origin ${{ steps.create_rc_branch.outputs.rc_testing_branch }}
gh pr create --title "[DO NOT MERGE] Test RC astro-sdk packages for $rc_issue_url" \
--fill
echo "git_rev=$(git rev-parse HEAD)" >> $GITHUB_OUTPUT
outputs:
rc_testing_branch: ${{ steps.create_rc_branch.outputs.rc_testing_branch }}

export-rc-testing-branch-name:
needs: [validate-manual-input, create-branch-for-testing-rc-release]
if: |
always() &&
(
needs.create-branch-for-testing-rc-release.result == 'success' &&
needs.create-branch-for-testing-rc-release.outputs.rc_testing_branch != ''
) ||
(
needs.validate-manual-input.result == 'success' &&
inputs.rc_testing_branch
)
runs-on: 'ubuntu-20.04'
steps:
- name: export rc_testing_branch
id: export-rc-testing-branch-name-step
run: |
if [ "${{ inputs.rc_testing_branch }}" == "" ]; then
rc_testing_branch=${{ needs.create-branch-for-testing-rc-release.outputs.rc_testing_branch }}
else
rc_testing_branch=${{ inputs.rc_testing_branch }}
fi
echo "rc_testing_branch=$rc_testing_branch" >> $GITHUB_OUTPUT
outputs:
rc_testing_branch: ${{ steps.export-rc-testing-branch-name-step.outputs.rc_testing_branch }}
check-airflow-provider-rc-release:
uses: astronomer/astronomer-providers/.github/workflows/reuse-wf-check-rc-release.yaml@main
working-directory: python-sdk
with:
rc_testing_branch: ${{ inputs.rc_testing_branch }}
issue_url: ${{ inputs.issue_url }}
base_git_rev: ${{ inputs.base_git_rev }}
git_email: "[email protected]"
git_username: "airflow-oss-bot"
working_directory: "python-sdk"
secrets:
BOT_ACCESS_TOKEN: ${{ secrets.BOT_ACCESS_TOKEN }}
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ repos:
additional_dependencies: [black>=22.10.0]

- repo: https://github.com/astral-sh/ruff-pre-commit
rev: 'v0.0.285'
rev: 'v0.0.287'
hooks:
- id: ruff
args:
Expand Down
29 changes: 26 additions & 3 deletions python-sdk/docs/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,36 @@
# Changelog

## 1.7.0a3
## 1.7.0

### Feature
- Allow users to disable schema check and creation on `transform` [#1925](https://github.com/astronomer/astro-sdk/pull/1925)
- Allow users to disable schema check and creation on `load_file` [#1922](https://github.com/astronomer/astro-sdk/pull/1922)
- Allow users to disable schema check and creation on `transform` [#1925](https://github.com/astronomer/astro-sdk/pull/1925)
- Add support for Excel files [#1978](https://github.com/astronomer/astro-sdk/pull/1978)
- Support loading metadata columns from stage into table for Snowflake [#2023](https://github.com/astronomer/astro-sdk/pull/2023)

### Bug fixes
### Bug Fixes
- Add `openlineage_dataset_uri` in databricks [#1919](https://github.com/astronomer/astro-sdk/pull/1919)
- Fix QueryModifier issue on Snowflake [#1962](https://github.com/astronomer/astro-sdk/pull/1962)
- Fix AstroCustomXcomBackend circular import issue [#1943](https://github.com/astronomer/astro-sdk/pull/1943)

### Misc
- Add an example DAG for using dynamic task with dataframe [#1912](https://github.com/astronomer/astro-sdk/pull/1912)
- Improve `example_load_file` DAG tasks names [#1958](https://github.com/astronomer/astro-sdk/pull/1958)
- Limit `databricks-sql-connector<2.9.0` [#2013](https://github.com/astronomer/astro-sdk/pull/2013)

### Docs
- Add docs about using dtype [#1903](https://github.com/astronomer/astro-sdk/pull/1903)
- Make cleanup operator summary docs smaller [#2017](https://github.com/astronomer/astro-sdk/pull/2017)


## 1.6.2

### Bug Fixes

- Fix Snowflake QueryModifier issue [#1962](https://github.com/astronomer/astro-sdk/pull/1962)
- Add support for Pandas 2, Airflow 2.6.3 and Python 3.11 [#1989](https://github.com/astronomer/astro-sdk/pull/1989)
- Update the WASB connection [#1994](https://github.com/astronomer/astro-sdk/pull/1994)


## 1.6.1

Expand Down
23 changes: 23 additions & 0 deletions python-sdk/example_dags/example_load_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -388,4 +388,27 @@
)
# [END load_file_example_28]

# [START load_file_example_29]
aql.load_file(
task_id="s3_to_snowflake_native_with_metadata_columns",
input_file=File("s3://astro-sdk/python_sdk/example_dags/data/sample.csv", conn_id=AWS_CONN_ID),
output_table=Table(
conn_id=SNOWFLAKE_CONN_ID,
),
load_options=[
SnowflakeLoadOptions(
file_options={"SKIP_HEADER": 1, "SKIP_BLANK_LINES": True},
copy_options={"ON_ERROR": "CONTINUE"},
metadata_columns=[
"METADATA$FILENAME",
"METADATA$FILE_ROW_NUMBER",
"METADATA$FILE_CONTENT_KEY",
"METADATA$FILE_LAST_MODIFIED",
"METADATA$START_SCAN_TIME",
],
)
],
)
# [END load_file_example_29]

aql.cleanup()
67 changes: 67 additions & 0 deletions python-sdk/src/astro/databases/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,14 @@ class SnowflakeDatabase(BaseDatabase):
)
DEFAULT_SCHEMA = SNOWFLAKE_SCHEMA

METADATA_COLUMNS_DATATYPE = {
"METADATA$FILENAME": "VARCHAR",
"METADATA$FILE_ROW_NUMBER": "NUMBER",
"METADATA$FILE_CONTENT_KEY": "VARCHAR",
"METADATA$FILE_LAST_MODIFIED": "TIMESTAMP_NTZ",
"METADATA$START_SCAN_TIME": "TIMESTAMP_LTZ",
}

def __init__(
self,
conn_id: str = DEFAULT_CONN_ID,
Expand Down Expand Up @@ -460,6 +468,21 @@ def is_native_autodetect_schema_available( # skipcq: PYL-R0201
)
return is_file_type_supported and is_file_location_supported

def create_table(self, table: BaseTable, *args, **kwargs):
"""Override create_table to add metadata columns to the table if specified in load_options"""
super().create_table(table, *args, **kwargs)
if self.load_options is None or not self.load_options.metadata_columns:
return
table_name = self.get_table_qualified_name(table)
metadata_columns = ",".join(
[
"IF NOT EXISTS " + f"{col} {self.METADATA_COLUMNS_DATATYPE[col]}"
for col in self.load_options.metadata_columns
]
)
sql_statement = f"ALTER TABLE {table_name} ADD COLUMN {metadata_columns};"
self.hook.run(sql_statement, autocommit=True)

def create_table_using_native_schema_autodetection(
self,
table: BaseTable,
Expand Down Expand Up @@ -622,13 +645,57 @@ def load_file_to_table_natively(
)
self.evaluate_results(rows)

def _get_table_columns_count(self, table_name: str) -> int:
"""Return the number of columns in a table."""
sql_statement = (
"SELECT count(*) COLUMN_COUNT from INFORMATION_SCHEMA.columns where table_name=%(table_name)s"
)
try:
table_columns_count = int(
self.hook.run(
sql_statement, parameters={"table_name": table_name}, handler=lambda cur: cur.fetchone()
)[0]
)
except AttributeError: # pragma: no cover
# For apache-airflow-providers-snowflake <3.2.0.
# Versions >=3.2.0 have the handler param in the run method due to the move to common.sql provider. However,
# versions <3.2 raise an exception AttributeError 'sfid' if we pass the handler.
try:
table_columns_count = int(
self.hook.run(sql_statement, parameters={"table_name": table_name})[0]["COLUMN_COUNT"]
)
except ValueError as exe:
raise DatabaseCustomError from exe
return table_columns_count

def _get_copy_into_with_metadata_sql_statement(
self, file_path: str, target_table: BaseTable, stage: SnowflakeStage
) -> str:
"""Return the sql statement for copy into with metadata columns."""
if self.load_options is None or not self.load_options.metadata_columns:
raise ValueError("Error: Requires metadata columns to be set in load options")
table_name = target_table.name.upper()
table_columns_count = self._get_table_columns_count(table_name)
non_metadata_columns_count = table_columns_count - len(self.load_options.metadata_columns)
select_non_metadata_columns = [f"${col}" for col in range(1, non_metadata_columns_count + 1)]
select_columns_list = select_non_metadata_columns + self.load_options.metadata_columns
select_columns = ",".join(select_columns_list)
sql_statement = (
f"COPY INTO {table_name} "
"FROM "
f"(SELECT {select_columns} FROM @{stage.qualified_name}/{file_path}) "
)
return sql_statement

def _copy_into_table_from_stage(self, source_file, target_table, stage):
table_name = self.get_table_qualified_name(target_table)
file_path = os.path.basename(source_file.path) or ""
sql_statement = f"COPY INTO {table_name} FROM @{stage.qualified_name}/{file_path}"

self._validate_before_copy_into(source_file, target_table, stage)

if self.load_options is not None and self.load_options.metadata_columns:
sql_statement = self._get_copy_into_with_metadata_sql_statement(file_path, target_table, stage)
# Below code is added due to breaking change in apache-airflow-providers-snowflake==3.2.0,
# we need to pass handler param to get the rows. But in version apache-airflow-providers-snowflake==3.1.0
# if we pass the handler provider raises an exception AttributeError
Expand Down
Loading

0 comments on commit dfafcb5

Please sign in to comment.