Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend OH SQL for snapshots retention #223

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package com.linkedin.openhouse.spark.e2e.extensions;

import static com.linkedin.openhouse.spark.MockHelpers.*;
import static com.linkedin.openhouse.spark.SparkTestBase.*;

import com.linkedin.openhouse.spark.SparkTestBase;
import org.apache.iceberg.catalog.TableIdentifier;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(SparkTestBase.class)
public class SetSnapshotsRetentionPolicyTest {
@Test
public void testSetSnapshotsRetentionPolicy() {
Object existingTable =
mockGetTableResponseBody(
"dbSetSnapshotsRetention",
"t1",
"c1",
"dbSetSnapshotsRetention.t1",
"u1",
mockTableLocation(
TableIdentifier.of("dbSetSnapshotsRetention", "t1"),
convertSchemaToDDLComponent(baseSchema),
""),
"V1",
baseSchema,
null,
null);
Object existingOpenhouseTable =
mockGetTableResponseBody(
"dbSetSnapshotsRetention",
"t1",
"c1",
"dbSetSnapshotsRetention.t1",
"u1",
mockTableLocationAfterOperation(
TableIdentifier.of("dbSetSnapshotsRetention", "t1"),
"ALTER TABLE %t SET TBLPROPERTIES ('openhouse.tableId'='t1')"),
"V1",
baseSchema,
null,
null);
Object tableAfterSetSnapshotsRetention =
mockGetTableResponseBody(
"dbSetSnapshotsRetention",
"t1",
"c1",
"dbSetSnapshotsRetention.t1",
"u1",
mockTableLocationAfterOperation(
TableIdentifier.of("dbSetSnapshotsRetention", "t1"),
"ALTER TABLE %t SET TBLPROPERTIES('policies'='{\"snapshotsRetention\":{\"timeCount\":24,\"granularity\":\"HOUR\",\"count\":10}}')"),
"V2",
baseSchema,
null,
null);
mockTableService.enqueue(mockResponse(200, existingOpenhouseTable)); // doRefresh()
mockTableService.enqueue(mockResponse(200, existingOpenhouseTable)); // doRefresh()
mockTableService.enqueue(mockResponse(201, tableAfterSetSnapshotsRetention)); // doCommit()
String ddlWithSchema =
"ALTER TABLE openhouse.dbSetSnapshotsRetention.t1 SET POLICY (SNAPSHOTS_RETENTION TTL=24H COUNT=10)";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • shall we use something more simple like version instead of snapshots, the latter was a overloaded term that can also be used to describe a non-partitioned table.
  • alternatively what about SET POLICY RETAIN 100 HOURS SET POLICY RETAIN 50 VERSIONS

Assertions.assertDoesNotThrow(() -> spark.sql(ddlWithSchema));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ singleStatement
statement
: ALTER TABLE multipartIdentifier SET POLICY '(' retentionPolicy (columnRetentionPolicy)? ')' #setRetentionPolicy
| ALTER TABLE multipartIdentifier SET POLICY '(' sharingPolicy ')' #setSharingPolicy
| ALTER TABLE multipartIdentifier SET POLICY '(' snapshotsRetentionPolicy ')' #setSnapshotsRetentionPolicy
| ALTER TABLE multipartIdentifier MODIFY columnNameClause SET columnPolicy #setColumnPolicyTag
| GRANT privilege ON grantableResource TO principal #grantStatement
| REVOKE privilege ON grantableResource FROM principal #revokeStatement
Expand Down Expand Up @@ -64,7 +65,7 @@ quotedIdentifier
;

nonReserved
: ALTER | TABLE | SET | POLICY | RETENTION | SHARING
: ALTER | TABLE | SET | POLICY | RETENTION | SHARING | SNAPSHOTS_RETENTION
| GRANT | REVOKE | ON | TO | SHOW | GRANTS | PATTERN | WHERE | COLUMN
;

Expand Down Expand Up @@ -131,6 +132,27 @@ policyTag
: PII | HC
;

snapshotsRetentionPolicy
: SNAPSHOTS_RETENTION snapshotsCombinedRetention
;

snapshotsCombinedRetention
: snapshotsTTL (snapshotsCount)?
;

snapshotsTTL
: TTL '=' snapshotsTTLValue
;

snapshotsCount
: COUNT '=' POSITIVE_INTEGER
;

snapshotsTTLValue
: RETENTION_DAY
| RETENTION_HOUR
;

ALTER: 'ALTER';
TABLE: 'TABLE';
SET: 'SET';
Expand All @@ -157,6 +179,9 @@ HC: 'HC';
MODIFY: 'MODIFY';
TAG: 'TAG';
NONE: 'NONE';
TTL: 'TTL';
COUNT: 'COUNT';
SNAPSHOTS_RETENTION : 'SNAPSHOTS_RETENTION';

POSITIVE_INTEGER
: DIGIT+
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package com.linkedin.openhouse.spark.sql.catalyst.parser.extensions

import com.linkedin.openhouse.spark.sql.catalyst.enums.GrantableResourceTypes
import com.linkedin.openhouse.spark.sql.catalyst.parser.extensions.OpenhouseSqlExtensionsParser._
import com.linkedin.openhouse.spark.sql.catalyst.plans.logical.{GrantRevokeStatement, SetRetentionPolicy, SetSharingPolicy, SetColumnPolicyTag, ShowGrantsStatement}
import com.linkedin.openhouse.spark.sql.catalyst.plans.logical.{GrantRevokeStatement, SetColumnPolicyTag, SetRetentionPolicy, SetSharingPolicy, SetSnapshotsRetentionPolicy, ShowGrantsStatement}
import com.linkedin.openhouse.spark.sql.catalyst.enums.GrantableResourceTypes.GrantableResourceType
import com.linkedin.openhouse.gen.tables.client.model.TimePartitionSpec
import org.antlr.v4.runtime.tree.ParseTree
Expand All @@ -18,10 +18,11 @@ class OpenhouseSqlExtensionsAstBuilder (delegate: ParserInterface) extends Openh

override def visitSetRetentionPolicy(ctx: SetRetentionPolicyContext): SetRetentionPolicy = {
val tableName = typedVisit[Seq[String]](ctx.multipartIdentifier)
val (granularity, count) = typedVisit[(String, Int)](ctx.retentionPolicy())
val retentionPolicy = ctx.retentionPolicy()
val (granularity, count) = typedVisit[(String, Int)](retentionPolicy)
val (colName, colPattern) =
if (ctx.columnRetentionPolicy() != null)
typedVisit[(String, String)](ctx.columnRetentionPolicy())
typedVisit[(String, String)](ctx.columnRetentionPolicy())
else (null, null)
SetRetentionPolicy(tableName, granularity, count, Option(colName), Option(colPattern))
}
Expand Down Expand Up @@ -94,8 +95,8 @@ class OpenhouseSqlExtensionsAstBuilder (delegate: ParserInterface) extends Openh
}
}

override def visitColumnRetentionPolicyPatternClause(ctx: ColumnRetentionPolicyPatternClauseContext): (String) = {
(ctx.retentionColumnPatternClause().STRING().getText)
override def visitColumnRetentionPolicyPatternClause(ctx: ColumnRetentionPolicyPatternClauseContext): String = {
ctx.retentionColumnPatternClause().STRING().getText
}

override def visitSharingPolicy(ctx: SharingPolicyContext): String = {
Expand All @@ -115,7 +116,7 @@ class OpenhouseSqlExtensionsAstBuilder (delegate: ParserInterface) extends Openh
}

override def visitDuration(ctx: DurationContext): (String, Int) = {
val granularity = if (ctx.RETENTION_DAY != null) {
val granularity: String = if (ctx.RETENTION_DAY != null) {
TimePartitionSpec.GranularityEnum.DAY.getValue()
} else if (ctx.RETENTION_YEAR() != null) {
TimePartitionSpec.GranularityEnum.YEAR.getValue()
Expand All @@ -124,13 +125,47 @@ class OpenhouseSqlExtensionsAstBuilder (delegate: ParserInterface) extends Openh
} else {
TimePartitionSpec.GranularityEnum.HOUR.getValue()
}

val count = ctx.getText.substring(0, ctx.getText.length - 1).toInt
(granularity, count)
}

override def visitSetSnapshotsRetentionPolicy(ctx: SetSnapshotsRetentionPolicyContext): SetSnapshotsRetentionPolicy = {
val tableName = typedVisit[Seq[String]](ctx.multipartIdentifier)
val (granularity, timeCount, count) = typedVisit[(String, Int, Int)](ctx.snapshotsRetentionPolicy())
SetSnapshotsRetentionPolicy(tableName, granularity, timeCount, count)
}

override def visitSnapshotsRetentionPolicy(ctx: SnapshotsRetentionPolicyContext): (String, Int, Int) = {
typedVisit[(String, Int, Int)](ctx.snapshotsCombinedRetention())
}

override def visitSnapshotsCombinedRetention(ctx: SnapshotsCombinedRetentionContext): (String, Int, Int) = {
val snapshotsTTL = ctx.snapshotsTTL()
val (granularity, timeCount) = typedVisit[(String, Int)](snapshotsTTL)
val count =
if (ctx.snapshotsCount() != null) {
typedVisit[Int](ctx.snapshotsCount())
} else 0
(granularity, timeCount, count)
}

override def visitSnapshotsTTL(ctx: SnapshotsTTLContext): (String, Int) = {
val ttl = ctx.snapshotsTTLValue()
val granularity: String = if (ttl.RETENTION_DAY() != null) {
TimePartitionSpec.GranularityEnum.DAY.getValue()
} else {
TimePartitionSpec.GranularityEnum.HOUR.getValue()
}
val count = ttl.getText.substring(0, ttl.getText.length - 1).toInt
(granularity, count)
}

override def visitSnapshotsCount(ctx: SnapshotsCountContext): Integer = {
ctx.POSITIVE_INTEGER().getText.toInt
}

private def toBuffer[T](list: java.util.List[T]) = list.asScala
private def toSeq[T](list: java.util.List[T]): Seq[T] = toBuffer(list).toSeq
private def toSeq[T](list: java.util.List[T]) = toBuffer(list).toSeq

private def typedVisit[T](ctx: ParseTree): T = {
ctx.accept(this).asInstanceOf[T]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.linkedin.openhouse.spark.sql.catalyst.plans.logical

import org.apache.spark.sql.catalyst.plans.logical.Command

case class SetSnapshotsRetentionPolicy (tableName: Seq[String], granularity: String, timeCount: Int, count: Int) extends Command {
override def simpleString(maxFields: Int): String = {
s"SetSnapshotsRetentionPolicy: ${tableName} ${timeCount} ${granularity} ${count}"
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.linkedin.openhouse.spark.sql.execution.datasources.v2

import com.linkedin.openhouse.spark.sql.catalyst.plans.logical.{GrantRevokeStatement, SetRetentionPolicy, SetSharingPolicy, SetColumnPolicyTag, ShowGrantsStatement}
import com.linkedin.openhouse.spark.sql.catalyst.plans.logical.{GrantRevokeStatement, SetColumnPolicyTag, SetRetentionPolicy, SetSharingPolicy, SetSnapshotsRetentionPolicy, ShowGrantsStatement}
import org.apache.iceberg.spark.{Spark3Util, SparkCatalog, SparkSessionCatalog}
import org.apache.spark.sql.{SparkSession, Strategy}
import org.apache.spark.sql.catalyst.expressions.PredicateHelper
Expand All @@ -15,6 +15,8 @@ case class OpenhouseDataSourceV2Strategy(spark: SparkSession) extends Strategy w
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case SetRetentionPolicy(CatalogAndIdentifierExtractor(catalog, ident), granularity, count, colName, colPattern) =>
SetRetentionPolicyExec(catalog, ident, granularity, count, colName, colPattern) :: Nil
case SetSnapshotsRetentionPolicy(CatalogAndIdentifierExtractor(catalog, ident), granularity, timeCount, count) =>
SetSnapshotsRetentionPolicyExec(catalog, ident, granularity, timeCount, count) :: Nil
case SetSharingPolicy(CatalogAndIdentifierExtractor(catalog, ident), sharing) =>
SetSharingPolicyExec(catalog, ident, sharing) :: Nil
case SetColumnPolicyTag(CatalogAndIdentifierExtractor(catalog, ident), policyTag, cols) =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.linkedin.openhouse.spark.sql.execution.datasources.v2

import org.apache.iceberg.spark.source.SparkTable
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
import org.apache.spark.sql.execution.datasources.v2.V2CommandExec

case class SetSnapshotsRetentionPolicyExec(
catalog: TableCatalog,
ident: Identifier,
granularity: String,
timeCount: Int,
count: Int
) extends V2CommandExec {

override lazy val output: Seq[Attribute] = Nil

override protected def run(): Seq[InternalRow] = {
catalog.loadTable(ident) match {
case iceberg: SparkTable if iceberg.table().properties().containsKey("openhouse.tableId") =>
val key = "updated.openhouse.policy"
val value = {
(count) match {
case (0) => s"""{"snapshotsRetention":{"timeCount":${timeCount},"granularity":"${granularity}"}}"""
case (_) =>
s"""{"snapshotsRetention":{"timeCount":${timeCount}, "granularity":"${granularity}", "count":${count}}}"""
}
}

iceberg.table().updateProperties()
.set(key, value)
.commit()

case table =>
throw new UnsupportedOperationException(s"Cannot set snapshots retention policy for non-Openhouse table: $table")
}

Nil
}

override def simpleString(maxFields: Int): String = {
s"SetSnapshotsRetentionPolicyExec: ${catalog} ${ident} ${timeCount} ${granularity} ${count}"
}
}
Loading