From ba2f50a55e98a4bbf6d907b102f5d8d53a6e7380 Mon Sep 17 00:00:00 2001 From: Selena Chen Date: Tue, 22 Oct 2024 11:03:49 -0700 Subject: [PATCH] Update self serve replication SQL to accept daily granularity as interval (#234) ## Summary This PR adds support for daily granularity as an valid input in the SQL API for the `interval` parameter as part of self serve replication. Now the following SQL is valid and will not throw an exception: ``` ALTER TABLE db.testTable SET POLICY (REPLICATION=({destination:'a', interval:1D})) ``` where interval is supported to take daily and hourly inputs. The validations for 'D' and 'H' inputs will continue to be performed at the server-side level to accept 12H and 1/2/3D inputs. The PR for that can be found [here](https://github.com/linkedin/openhouse/pull/227). ## Changes - [x] Client-facing API Changes - [ ] Internal API Changes - [ ] Bug Fixes - [x] New Features - [ ] Performance Improvements - [ ] Code Style - [ ] Refactoring - [ ] Documentation - [ ] Tests For all the boxes checked, please include additional details of the changes made in this pull request. ## Testing Done - [x] Manually Tested on local docker setup. Please include commands ran, and their output. - [ ] Added new tests for the changes made. - [x] Updated existing tests to reflect the changes made. - [ ] No tests added or updated. Please explain why. If unsure, please feel free to ask for help. - [ ] Some other form of testing like staging or soak time in production. Please explain. For all the boxes checked, include a detailed description of the testing done for the changes made in this pull request. Updated unit tests for SQL statements and tested in local docker: ``` scala> spark.sql("ALTER TABLE u_tableowner.test SET POLICY (REPLICATION=({destination:'a', interval:1D}))") res6: org.apache.spark.sql.DataFrame = [] ``` ``` scala> spark.sql("ALTER TABLE u_tableowner.test SET POLICY (REPLICATION=({destination:'a', interval:12H}))") res8: org.apache.spark.sql.DataFrame = [] ``` Using anything other than `h/H` or `d/D` throws an exception: ``` scala> spark.sql("ALTER TABLE u_tableowner.test SET POLICY (REPLICATION=({destination:'a', interval:1}))") com.linkedin.openhouse.spark.sql.catalyst.parser.extensions.OpenhouseParseException: no viable alternative at input 'interval:1'; line 1 pos 82 ``` ``` scala> spark.sql("ALTER TABLE u_tableowner.test SET POLICY (REPLICATION=({destination:'a', interval:1Y}))") com.linkedin.openhouse.spark.sql.catalyst.parser.extensions.OpenhouseParseException: no viable alternative at input 'interval:1Y'; line 1 pos 82 at com.linkedin.openhouse.spark.sql.catalyst.parser.extensions.OpenhouseParseErrorListener$.syntaxError(OpenhouseSparkSqlExtensionsParser.scala:123) at org.antlr.v4.runtime.ProxyErrorListener.syntaxError(ProxyErrorListener.java:41) ``` # Additional Information - [ ] Breaking Changes - [ ] Deprecations - [ ] Large PR broken into smaller PRs, and PR plan linked in the description. For all the boxes checked, include additional details of the changes made in this pull request. --- ...etTableReplicationPolicyStatementTest.java | 26 ++++--------------- .../extensions/OpenhouseSqlExtensions.g4 | 1 + .../OpenhouseSqlExtensionsAstBuilder.scala | 4 ++- 3 files changed, 9 insertions(+), 22 deletions(-) diff --git a/integrations/spark/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/statementtest/SetTableReplicationPolicyStatementTest.java b/integrations/spark/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/statementtest/SetTableReplicationPolicyStatementTest.java index 02138f06..db355f62 100644 --- a/integrations/spark/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/statementtest/SetTableReplicationPolicyStatementTest.java +++ b/integrations/spark/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/statementtest/SetTableReplicationPolicyStatementTest.java @@ -43,20 +43,20 @@ public void setupSpark() { @Test public void testSimpleSetReplicationPolicy() { - String replicationConfigJson = "[{\"destination\":\"a\", \"interval\":\"24H\"}]"; + String replicationConfigJson = "[{\"destination\":\"a\", \"interval\":\"12H\"}]"; Dataset ds = spark.sql( "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = " - + "({destination:'a', interval:24H}))"); + + "({destination:'a', interval:12H}))"); assert isPlanValid(ds, replicationConfigJson); // Test support with multiple clusters replicationConfigJson = - "[{\"destination\":\"a\", \"interval\":\"12H\"}, {\"destination\":\"aa\", \"interval\":\"12H\"}]"; + "[{\"destination\":\"a\", \"interval\":\"12H\"}, {\"destination\":\"aa\", \"interval\":\"2D\"}]"; ds = spark.sql( "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = " - + "({destination:'a', interval:12h}, {destination:'aa', interval:12H}))"); + + "({destination:'a', interval:12h}, {destination:'aa', interval:2d}))"); assert isPlanValid(ds, replicationConfigJson); } @@ -187,7 +187,7 @@ public void testReplicationPolicyWithoutProperSyntax() { "ALTER TABLE openhouse.db.table SET POLICY (REPLICAT = ({destination: 'aa', interval: '12h'}))") .show()); - // Interval input does not follow 'h/H' format + // Interval input does not follow 'h/H' or 'd/D' format Assertions.assertThrows( OpenhouseParseException.class, () -> @@ -196,22 +196,6 @@ public void testReplicationPolicyWithoutProperSyntax() { "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({destination: 'aa', interval: '12'}))") .show()); - Assertions.assertThrows( - OpenhouseParseException.class, - () -> - spark - .sql( - "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({destination: 'aa', interval: '1D'}))") - .show()); - - Assertions.assertThrows( - OpenhouseParseException.class, - () -> - spark - .sql( - "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({destination: 'aa', interval: '12d'}))") - .show()); - // Missing cluster and interval values Assertions.assertThrows( OpenhouseParseException.class, diff --git a/integrations/spark/openhouse-spark-runtime/src/main/antlr/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensions.g4 b/integrations/spark/openhouse-spark-runtime/src/main/antlr/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensions.g4 index 67fa6585..435a77e6 100644 --- a/integrations/spark/openhouse-spark-runtime/src/main/antlr/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensions.g4 +++ b/integrations/spark/openhouse-spark-runtime/src/main/antlr/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensions.g4 @@ -102,6 +102,7 @@ replicationPolicyClusterClause replicationPolicyIntervalClause : INTERVAL ':' RETENTION_HOUR + | INTERVAL ':' RETENTION_DAY ; columnRetentionPolicyPatternClause diff --git a/integrations/spark/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensionsAstBuilder.scala b/integrations/spark/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensionsAstBuilder.scala index 408c9cf3..0619f834 100644 --- a/integrations/spark/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensionsAstBuilder.scala +++ b/integrations/spark/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensionsAstBuilder.scala @@ -115,7 +115,9 @@ class OpenhouseSqlExtensionsAstBuilder (delegate: ParserInterface) extends Openh } override def visitReplicationPolicyIntervalClause(ctx: ReplicationPolicyIntervalClauseContext): (String) = { - ctx.RETENTION_HOUR().getText.toUpperCase + if (ctx.RETENTION_HOUR() != null) + ctx.RETENTION_HOUR().getText.toUpperCase() + else ctx.RETENTION_DAY().getText.toUpperCase() } override def visitColumnRetentionPolicy(ctx: ColumnRetentionPolicyContext): (String, String) = {