Skip to content

Commit

Permalink
Update self serve replication SQL to accept daily granularity as inte…
Browse files Browse the repository at this point in the history
…rval (#234)

## Summary

<!--- HINT: Replace #nnn with corresponding Issue number, if you are
fixing an existing issue -->

This PR adds support for daily granularity as an valid input in the SQL
API for the `interval` parameter as part of self serve replication.

Now the following SQL is valid and will not throw an exception:
```
ALTER TABLE db.testTable SET POLICY (REPLICATION=({destination:'a', interval:1D}))
```
where interval is supported to take daily and hourly inputs. 
The validations for 'D' and 'H' inputs will continue to be performed at
the server-side level to accept 12H and 1/2/3D inputs. The PR for that
can be found [here](#227).

## Changes

- [x] Client-facing API Changes
- [ ] Internal API Changes
- [ ] Bug Fixes
- [x] New Features
- [ ] Performance Improvements
- [ ] Code Style
- [ ] Refactoring
- [ ] Documentation
- [ ] Tests

For all the boxes checked, please include additional details of the
changes made in this pull request.

## Testing Done
<!--- Check any relevant boxes with "x" -->

- [x] Manually Tested on local docker setup. Please include commands
ran, and their output.
- [ ] Added new tests for the changes made.
- [x] Updated existing tests to reflect the changes made.
- [ ] No tests added or updated. Please explain why. If unsure, please
feel free to ask for help.
- [ ] Some other form of testing like staging or soak time in
production. Please explain.

For all the boxes checked, include a detailed description of the testing
done for the changes made in this pull request.
Updated unit tests for SQL statements and tested in local docker:

```
scala> spark.sql("ALTER TABLE u_tableowner.test SET POLICY (REPLICATION=({destination:'a', interval:1D}))")
res6: org.apache.spark.sql.DataFrame = []
```
```
scala> spark.sql("ALTER TABLE u_tableowner.test SET POLICY (REPLICATION=({destination:'a', interval:12H}))")
res8: org.apache.spark.sql.DataFrame = []
```
Using anything other than `h/H` or `d/D` throws an exception:
```
scala> spark.sql("ALTER TABLE u_tableowner.test SET POLICY (REPLICATION=({destination:'a', interval:1}))")
com.linkedin.openhouse.spark.sql.catalyst.parser.extensions.OpenhouseParseException: no viable alternative at input 'interval:1'; line 1 pos 82
```
```
scala> spark.sql("ALTER TABLE u_tableowner.test SET POLICY (REPLICATION=({destination:'a', interval:1Y}))")
com.linkedin.openhouse.spark.sql.catalyst.parser.extensions.OpenhouseParseException: no viable alternative at input 'interval:1Y'; line 1 pos 82
  at com.linkedin.openhouse.spark.sql.catalyst.parser.extensions.OpenhouseParseErrorListener$.syntaxError(OpenhouseSparkSqlExtensionsParser.scala:123)
  at org.antlr.v4.runtime.ProxyErrorListener.syntaxError(ProxyErrorListener.java:41)
```

# Additional Information

- [ ] Breaking Changes
- [ ] Deprecations
- [ ] Large PR broken into smaller PRs, and PR plan linked in the
description.

For all the boxes checked, include additional details of the changes
made in this pull request.
  • Loading branch information
chenselena authored Oct 22, 2024
1 parent d02b2ed commit ba2f50a
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,20 +43,20 @@ public void setupSpark() {

@Test
public void testSimpleSetReplicationPolicy() {
String replicationConfigJson = "[{\"destination\":\"a\", \"interval\":\"24H\"}]";
String replicationConfigJson = "[{\"destination\":\"a\", \"interval\":\"12H\"}]";
Dataset<Row> ds =
spark.sql(
"ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = "
+ "({destination:'a', interval:24H}))");
+ "({destination:'a', interval:12H}))");
assert isPlanValid(ds, replicationConfigJson);

// Test support with multiple clusters
replicationConfigJson =
"[{\"destination\":\"a\", \"interval\":\"12H\"}, {\"destination\":\"aa\", \"interval\":\"12H\"}]";
"[{\"destination\":\"a\", \"interval\":\"12H\"}, {\"destination\":\"aa\", \"interval\":\"2D\"}]";
ds =
spark.sql(
"ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = "
+ "({destination:'a', interval:12h}, {destination:'aa', interval:12H}))");
+ "({destination:'a', interval:12h}, {destination:'aa', interval:2d}))");
assert isPlanValid(ds, replicationConfigJson);
}

Expand Down Expand Up @@ -187,7 +187,7 @@ public void testReplicationPolicyWithoutProperSyntax() {
"ALTER TABLE openhouse.db.table SET POLICY (REPLICAT = ({destination: 'aa', interval: '12h'}))")
.show());

// Interval input does not follow 'h/H' format
// Interval input does not follow 'h/H' or 'd/D' format
Assertions.assertThrows(
OpenhouseParseException.class,
() ->
Expand All @@ -196,22 +196,6 @@ public void testReplicationPolicyWithoutProperSyntax() {
"ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({destination: 'aa', interval: '12'}))")
.show());

Assertions.assertThrows(
OpenhouseParseException.class,
() ->
spark
.sql(
"ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({destination: 'aa', interval: '1D'}))")
.show());

Assertions.assertThrows(
OpenhouseParseException.class,
() ->
spark
.sql(
"ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({destination: 'aa', interval: '12d'}))")
.show());

// Missing cluster and interval values
Assertions.assertThrows(
OpenhouseParseException.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ replicationPolicyClusterClause

replicationPolicyIntervalClause
: INTERVAL ':' RETENTION_HOUR
| INTERVAL ':' RETENTION_DAY
;

columnRetentionPolicyPatternClause
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,9 @@ class OpenhouseSqlExtensionsAstBuilder (delegate: ParserInterface) extends Openh
}

override def visitReplicationPolicyIntervalClause(ctx: ReplicationPolicyIntervalClauseContext): (String) = {
ctx.RETENTION_HOUR().getText.toUpperCase
if (ctx.RETENTION_HOUR() != null)
ctx.RETENTION_HOUR().getText.toUpperCase()
else ctx.RETENTION_DAY().getText.toUpperCase()
}

override def visitColumnRetentionPolicy(ctx: ColumnRetentionPolicyContext): (String, String) = {
Expand Down

0 comments on commit ba2f50a

Please sign in to comment.