-
Notifications
You must be signed in to change notification settings - Fork 181
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
iceberg
table format support for filesystem
destination
#2067
base: devel
Are you sure you want to change the base?
Conversation
- mypy upgrade needed to solve this issue: apache/iceberg-python#768 - uses <1.13.0 requirement on mypy because 1.13.0 gives error - new lint errors arising due to version upgrade are simply ignored
…-iceberg-filesystem
✅ Deploy Preview for dlt-hub-docs canceled.
|
✅ Deploy Preview for dlt-hub-docs ready!
To edit notification comments on pull requests, go to your Netlify site configuration. |
@rudolfix / @sh-rp this PR isn't there yet but can you give an intermediate review? What still needs to happen:
Notes:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO this is almost good.
- we need to decide if we want to save per table catalog.
- I'd enable iceberg scanner in duckdb/sql_client in filesystem. mostly to start running the same tests that use delta
- maybe a refactor of
get_catalog
I mentioned.
dlt/common/libs/pyiceberg.py
Outdated
"""Returns single-table, ephemeral, in-memory Iceberg catalog.""" | ||
|
||
# create in-memory catalog | ||
catalog = SqlCatalog( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NOTE: how we get a catalog should be some kind of plugin. so we can easily plug glue or rest to filesystem
table_id = f"{DLT_ICEBERG_NAMESPACE}.{table_name}" | ||
table_path = f"{client.dataset_path}/{table_name}" | ||
metadata_path = f"{table_path}/metadata" | ||
if client.fs_client.exists(metadata_path): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it possible to split the function that returns a catalog (even per table so a new catalog is simply empty) and function that does table maintenance (evolves schema or creates empty table)
IMO the code structure would be
with get_catalog() as catalog:
create_or_evolve_table()
write_data()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where catalog is abstract catalog class of pyiceberg
This reverts commit 54cd0bc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
code is good and ready to merge. we also have sufficient tests. we still need a little bit better docs.
pyiceberg.io:__init__.py:348 Defaulting to PyArrow FileIO
maybe we could hide this log? we set log levels during tests inpytest_configure
- there are some tests that do not work in https://github.com/dlt-hub/dlt/actions/runs/12096164907/job/33730150792?pr=2067#step:8:4106 pls take a look
- we need help with improving the docs. my take would be to create a separate destination Delta / Iceberg where we could move most of the docs out of the filesystem WDYT?
- for both table formats you could write shortly how they are implemented
- in case of iceberg we also should describe how we write tables without catalog and mention the limitations (single writer!)
- we have a page where we enumerate table formats. I'll write some kind of introduction there next week
@@ -395,6 +422,13 @@ def prepare_load_table(self, table_name: str) -> PreparedTableSchema: | |||
if table["write_disposition"] == "merge": | |||
table["write_disposition"] = "append" | |||
table.pop("table_format", None) | |||
merge_strategy = resolve_merge_strategy(self.schema.tables, table, self.capabilities) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK probably we cannot avoid that. time to add supported write dispositions and a selector to destination capabilities
elif schema_table.get("table_format") == "iceberg": | ||
from dlt.common.libs.pyiceberg import _get_last_metadata_file | ||
|
||
self._setup_iceberg(self._conn) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isn't the latest version of duckdb iceberg working with it?
closes #1996