-
Notifications
You must be signed in to change notification settings - Fork 4.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HIVE-28586 Support write order for Iceberg tables at CREATE TABLE #5541
base: master
Are you sure you want to change the base?
Conversation
bef019b
to
ca442ed
Compare
ca442ed
to
3d177a0
Compare
3d177a0
to
909e224
Compare
909e224
to
7bf1899
Compare
Change-Id: Ia9a0a92d19d33693887137c797e0662088a314db
7bf1899
to
cbb14dd
Compare
Quality Gate passedIssues Measures |
@init { pushMsg("table sorted specification", state); } | ||
@after { popMsg(state); } | ||
: | ||
KW_WRITE KW_ORDERED KW_BY sortCols=columnNameOrderList |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this syntax equal to Spark-Iceberg alter-table-write-ordered-by
?
https://iceberg.apache.org/docs/1.7.0/spark-ddl/#alter-table-write-ordered-by
Do you consider also to add the alter-table-write-ordered-by
syntax? So that user can alter the existing iceberg table to support write order.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, Trino added the write-sort
through table property:
https://trino.io/docs/current/connector/iceberg.html#sorted-tables
https://www.starburst.io/blog/improving-performance-with-iceberg-sorted-tables/
CREATE TABLE
catalog_sales_sorted (
cs_sold_date_sk bigint,
more columns...
)
WITH
(
format = 'PARQUET',
sorted_by = ARRAY['cs_sold_date_sk'],
type = 'ICEBERG'
)
This can avoid introducing new create syntax. Anyway, we should find a simple syntax which user can use.
I also want to hear other folks opinions about the syntax.
cc @ayushtkn @deniskuzZ @okumin
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Disclaimer: I might not be understanding the intention of this PR.
Is the semantics of the sort order of Iceberg different from that of Hive's SORTED BY of bucketed sorted tables? If it is the same, we may use it. Maybe, it is different. I also feel the global ordering is hard to achieve with the current Hive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL
The CLUSTERED BY and SORTED BY creation commands do not affect how data is inserted into a table – only how it is read. This means that users must be careful to insert data correctly by specifying the number of reducers to be equal to the number of buckets, and using CLUSTER BY and SORT BY commands in their query.
@okumin, this PR intends to add support for Iceberg sorted tables, same as in a trino link @zhangbutao added. I do not really like that we have to create new syntax for that.
SORTED BY
kinda makes sense, but it would be overloaded for Iceberg since it would be used to specify data insert order.
Spark only supports alter-table-write-ordered-by to define the write order.
Maybe we could have a combination of ALTER + Trino table properties styles?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand the SORT order of the Iceberg is a hint in writing.
- Table Spec says, "Writers should use this default sort order to sort the data on write, but are not required to if the default order is prohibitively expensive, as it would be for streaming writes."
- Spark DDL says, "Table write order does not guarantee data order for queries. It only affects how data is written to the table."
- Trino has a toggle, sorted_writing_enabled, to obey the hint
So, I agree that reusing SORTED BY is confusing. I'm also checking what syntaxes are used in other engines...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zratkai you have created the ticket HIVE-28587, which means that you will implement the alter table
syntax like spark syntax. https://iceberg.apache.org/docs/1.7.0/spark-ddl/#alter-table-write-ordered-by Right?
This syntax comes from the official Iceberg recommendation, which is for alter table, and to be consistend I modified the ALTER to CREATE in the syntax, so at CREATE table it is the same as ALTER TABLE:
I already know why you give this create syntax. But since there is no uniform Create
syntax for the write order
, i think we no need to make Create
syntax to be consistend with the Alter
syntax. If the table property is more simple, i prefer to create + table property
like trino.
If you think create + table property
is also not friendly to users. I am not against the currrent implementation. You just need to update the wiki doc to tell users the create dialect for iceberg write order.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there is no common agreement, We can reach out the dev mailing list & can proceed with what majority of the folks like, This Syntax discussion is kind of pure personal preference stuff I believe, what each of us find more convenient & there is no way we can conclude like this which is better.
Was reading this doc:
https://www.tabular.io/apache-iceberg-cookbook/data-engineering-table-write-order/
It had something like
ALTER TABLE logs WRITE LOCALLY ORDERED BY level, event_ts
&
here:
https://iceberg.apache.org/docs/latest/spark-ddl/#alter-table-write-ordered-by
ALTER TABLE prod.db.sample WRITE LOCALLY ORDERED BY category, id
If LOCALLY isn't specified it is Global Sort order as the doc says 'To order within each task, not across tasks, use LOCALLY ORDERED BY:' and I believe this PR is just supporting LOCAL sort order not the Global one.
So, this kind of looks fine to me, We can just suffix in the Create command as well something like "WRITE LOCALLY ORDERED BY level, event_ts" which will be in sync with ALTER command & all such above docs.
Again just my preference, if all of us aren't convinced on one choice, we can fallback to dev@ and go what the majority feels more soothing to eyes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ayushtkn thanks Ayush for the comment!
I agree with you and @deniskuzZ, LOCALLY ORDERED BY is the right choice here.
@okumin @zhangbutao if you can accept that I will modify it accordingly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's fine to me. 😃 Just do it. We can optimize this later if someone has new good idea in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I give +1 to WRITE LOCALLY ORDERED BY.
I double-checked WRITE ORDERED BY
vs WRITE LOCALLY ORDERED BY
with Spark 3.5.1 and Iceberg 1.6.1.
spark-sql (default)> CREATE TABLE hadoop_prod.default.test2 (a int) USING iceberg;
Time taken: 0.089 seconds
spark-sql (default)> ALTER TABLE hadoop_prod.default.test2 WRITE ORDERED BY a;
Time taken: 0.182 seconds
spark-sql (default)> CREATE TABLE hadoop_prod.default.test3 (a int) USING iceberg;
Time taken: 0.086 seconds
spark-sql (default)> ALTER TABLE hadoop_prod.default.test3 WRITE LOCALLY ORDERED BY a;
This is the diff.
zookage@client-node-0:~$ hdfs dfs -cat /user/hive/warehouse/catalog/default/test2/metadata/v2.metadata.json > /tmp/test2.json
zookage@client-node-0:~$ hdfs dfs -cat /user/hive/warehouse/catalog/default/test3/metadata/v2.metadata.json > /tmp/test3.json
zookage@client-node-0:~$ diff /tmp/test2.json /tmp/test3.json
3,4c3,4
< "table-uuid" : "821c3cc2-1320-45dc-bb2a-c805778caa91",
< "location" : "hdfs://hdfs-namenode-0.hdfs-namenode:8020/user/hive/warehouse/catalog/default/test2",
---
> "table-uuid" : "094951f1-6229-43cd-bffe-cb2f086b8dda",
> "location" : "hdfs://hdfs-namenode-0.hdfs-namenode:8020/user/hive/warehouse/catalog/default/test3",
6c6
< "last-updated-ms" : 1733994463428,
---
> "last-updated-ms" : 1733994491871,
40c40
< "write.distribution-mode" : "range",
---
> "write.distribution-mode" : "none",
50,51c50,51
< "timestamp-ms" : 1733994460005,
< "metadata-file" : "hdfs://hdfs-namenode-0.hdfs-namenode:8020/user/hive/warehouse/catalog/default/test2/metadata/v1.metadata.json"
---
> "timestamp-ms" : 1733994472725,
> "metadata-file" : "hdfs://hdfs-namenode-0.hdfs-namenode:8020/user/hive/warehouse/catalog/default/test3/metadata/v1.metadata.json"
Looks like, the meaningful difference is only write.distribution-mode=range
or write.distribution-mode=none
. I guess adding LOCALLY makes more sense unless we give range
.
I am not a specialist of Apache Spark. Please feel free to correct me if I am wrong.
@@ -119,6 +119,7 @@ public static Collection<String> data() { | |||
"ONLY", | |||
"OR", | |||
"ORDER", | |||
"ORDERED", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would like to include it in non-reserved words because ORDERED
is not listed in the reserved words of SQL:2023.
nonReserved |
@@ -88,6 +88,7 @@ private InputFormatConfig() { | |||
public static final String CATALOG_CLASS_TEMPLATE = "iceberg.catalog.%s.catalog-impl"; | |||
public static final String CATALOG_DEFAULT_CONFIG_PREFIX = "iceberg.catalog-default."; | |||
public static final String QUERY_FILTERS = "iceberg.query.filters"; | |||
public static final String INSERT_WRITE_ORDER = "iceberg.write-order"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can't we use iceberg config: TableProperties.DEFAULT_SORT_ORDER?
void setSortOrder(TableMetadata metadata, Map<String, String> parameters) {
parameters.remove(TableProperties.DEFAULT_SORT_ORDER);
if (exposeInHmsProperties()
&& metadata.sortOrder() != null
&& metadata.sortOrder().isSorted()) {
String sortOrder = SortOrderParser.toJson(metadata.sortOrder());
setField(parameters, TableProperties.DEFAULT_SORT_ORDER, sortOrder);
}
}
} | ||
|
||
private static SortOrder getSortOrder(Properties props, Schema schema) { | ||
String sortOrderJsonString = props.getProperty(InputFormatConfig.INSERT_WRITE_ORDER); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sortOrderAsJson
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I would use "asJson" in a method name to express a transformation is happening like toString(), asJson(), asXML ect not in a variable name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i don't insist, just saw this naming is used in iceberg
private void addSortOrder(org.apache.hadoop.hive.metastore.api.Table hmsTable, Schema schema, | ||
Properties properties) { | ||
SortOrder.Builder sortOderBuilder = SortOrder.builderFor(schema); | ||
hmsTable.getSd().getSortCols().forEach(item -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
item -> field
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
iceberg supports expressions in sort order like sort by day(time)
, can we add support for that as well?
hmsTable.getSd().getSortCols().forEach(item -> { | ||
NullOrder nullOrder = item.getNullOrdering() == NullOrderingType.NULLS_FIRST ? | ||
NullOrder.NULLS_FIRST : NullOrder.NULLS_LAST; | ||
if (item.getOrder() == 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sortOderBuilder.sortBy(
item.getCol(), (item.getOrder() == 0? SortDirection.DESC : SortDirection.ASC, nullOrder)
addSortOrder(hmsTable, schema, catalogProperties); | ||
} | ||
|
||
private void addSortOrder(org.apache.hadoop.hive.metastore.api.Table hmsTable, Schema schema, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
setSortOrder
?
@@ -781,7 +802,7 @@ private void setCommonHmsTablePropertiesForIceberg(org.apache.hadoop.hive.metast | |||
* @param hmsTable Table for which we are calculating the properties | |||
* @return The properties we can provide for Iceberg functions, like {@link Catalogs} | |||
*/ | |||
private static Properties getCatalogProperties(org.apache.hadoop.hive.metastore.api.Table hmsTable) { | |||
private Properties getCatalogProperties(org.apache.hadoop.hive.metastore.api.Table hmsTable) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why static was removed?
set hive.optimize.shared.work.merge.ts.schema=true; | ||
|
||
|
||
create table ice_orc_sorted (id int, text string) write ordered by id desc nulls first, text asc nulls last stored by iceberg stored as orc; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we specify the sort column/expressions in parentheses?
@@ -31,6 +31,7 @@ Table Parameters: | |||
current-schema {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"i\",\"required\":false,\"type\":\"int\"},{\"id\":2,\"name\":\"s\",\"required\":false,\"type\":\"string\"},{\"id\":3,\"name\":\"ts\",\"required\":false,\"type\":\"timestamp\"},{\"id\":4,\"name\":\"d\",\"required\":false,\"type\":\"date\"}]} | |||
format-version 2 | |||
iceberg.orc.files.only true | |||
iceberg.write-order {\"order-id\":0,\"fields\":[]} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we add this if no order provided? i think that config shouldn't even be set
@after { popMsg(state); } | ||
: | ||
KW_WRITE KW_ORDERED KW_BY sortCols=columnNameOrderList | ||
-> ^(TOK_ALTERTABLE_BUCKETS $sortCols?) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should support iceberg transform expressions as well, not just identity
numBuckets = Integer.parseInt(child.getChild(2).getText()); | ||
switch(child.getChildCount()){ | ||
case 1: | ||
sortCols = getColumnNamesOrder((ASTNode) child.getChild(0)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why it's under TOK_ALTERTABLE_BUCKETS?
@@ -14444,6 +14453,22 @@ ASTNode analyzeCreateTable( | |||
return null; | |||
} | |||
|
|||
private void checkSortCols(List<Order> sortCols, StorageFormat storageFormat) throws SemanticException{ | |||
if("org.apache.iceberg.mr.hive.HiveIcebergStorageHandler".equalsIgnoreCase(storageFormat.getStorageHandler())){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: space
@@ -281,6 +281,11 @@ enum DatabaseType { | |||
REMOTE = 2 | |||
} | |||
|
|||
enum NullOrderingType { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- do we store it in HMS API? why changes in thrift?
- didn't we have any order abstractions before in Hive?
Change-Id: Ia9a0a92d19d33693887137c797e0662088a314db
What changes were proposed in this pull request?
Support for write order in iceberg tables. Like:
CREATE TABLE prod.db.sample WRITE ORDERED BY category ASC NULLS LAST, id DESC NULLS FIRST
Why are the changes needed?
To support write ordering in iceberg tables.
Does this PR introduce any user-facing change?
Yes, user can now use this syntax:
CREATE TABLE prod.db.sample WRITE ORDERED BY category ASC NULLS LAST, id DESC NULLS FIRST;
Is the change a dependency upgrade?
No.
How was this patch tested?
With qtest.