Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Supporting CTAS queries for Hive to Spark query translations #324

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ protected ToRelConverter(Map<String, Map<String, List<String>>> localMetaStore)
public RelNode convertSql(String sql) {
return toRel(toSqlNode(sql));
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add this empty line back?

/**
* Similar to {@link #convertSql(String)} but converts hive view definition stored
* in the hive metastore to corresponding {@link RelNode} implementation.
Expand All @@ -133,8 +132,6 @@ public RelNode convertView(String hiveDbName, String hiveViewName) {
return toRel(sqlNode);
}

// TODO change back to protected once the relevant tests move to the common package
@VisibleForTesting
public SqlNode toSqlNode(String sql) {
return toSqlNode(sql, null);
}
Expand All @@ -161,8 +158,7 @@ public SqlNode processView(String dbName, String tableName) {
return toSqlNode(stringViewExpandedText, table);
}

@VisibleForTesting
protected RelNode toRel(SqlNode sqlNode) {
public RelNode toRel(SqlNode sqlNode) {
RelRoot root = getSqlToRelConverter().convertQuery(sqlNode, true, true);
return standardizeRel(root.rel);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.linkedin.coral.common.calcite.sql;

import org.apache.calcite.sql.SqlNode;

public interface SqlCommand {
ljfgem marked this conversation as resolved.
Show resolved Hide resolved

public SqlNode getSelectQuery();

public void setSelectQuery(SqlNode selectQuery);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modifier 'public' is redundant for interface methods.

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package com.linkedin.coral.common.calcite.sql;

import com.linkedin.coral.javax.annotation.Nullable;
import org.apache.calcite.sql.*;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.util.ImmutableNullableList;

import java.util.List;
import java.util.Objects;


public class SqlCreateTable extends SqlCreate implements SqlCommand{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a java doc for this class?

//name of the table to be created
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: space after //

private final SqlIdentifier name;
// column details like column name, data type, etc. This may be null, like in case of CTAS
private final @Nullable SqlNodeList columnList;
// select query node in case of "CREATE TABLE ... AS query"; else may be null
private @Nullable SqlNode selectQuery;
// specifying serde property
private final @Nullable SqlNode serDe;
// specifying file format such as Parquet, ORC, etc.
private final @Nullable SqlNodeList fileFormat;
// specifying delimiter fields for row format
private final @Nullable SqlCharStringLiteral rowFormat;

private static final SqlOperator OPERATOR =
new SqlSpecialOperator("CREATE TABLE", SqlKind.CREATE_TABLE);

/** Creates a SqlCreateTable. */
public SqlCreateTable(SqlParserPos pos, boolean replace, boolean ifNotExists,
SqlIdentifier name, @Nullable SqlNodeList columnList, @Nullable SqlNode selectQuery,
@Nullable SqlNode serDe, @Nullable SqlNodeList fileFormat, @Nullable SqlCharStringLiteral rowFormat) {
super(OPERATOR, pos, replace, ifNotExists);
this.name = Objects.requireNonNull(name, "name");
nimesh1601 marked this conversation as resolved.
Show resolved Hide resolved
this.columnList = columnList;
this.selectQuery = selectQuery;
this.serDe = serDe;
this.fileFormat = fileFormat;
this.rowFormat = rowFormat;
}

@SuppressWarnings("nullness")
@Override public List<SqlNode> getOperandList() {
return ImmutableNullableList.of(name, columnList, selectQuery, serDe, fileFormat, rowFormat);
}

@Override public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
ljfgem marked this conversation as resolved.
Show resolved Hide resolved
writer.keyword("CREATE");
nimesh1601 marked this conversation as resolved.
Show resolved Hide resolved
writer.keyword("TABLE");
if (ifNotExists) {
writer.keyword("IF NOT EXISTS");
}
name.unparse(writer, leftPrec, rightPrec);
if (columnList != null) {
SqlWriter.Frame frame = writer.startList("(", ")");
for (SqlNode c : columnList) {
writer.sep(",");
c.unparse(writer, 0, 0);
}
writer.endList(frame);
}
if(serDe != null){
writer.keyword("ROW FORMAT SERDE");
serDe.unparse(writer, 0, 0);
writer.newlineAndIndent();
}
if(rowFormat != null){
writer.keyword("ROW FORMAT DELIMITED FIELDS TERMINATED BY");
rowFormat.unparse(writer, 0, 0);
writer.newlineAndIndent();
}
if(fileFormat != null){
if(fileFormat.size() == 1){
writer.keyword("STORED AS");
fileFormat.get(0).unparse(writer, 0, 0);
writer.newlineAndIndent();
} else {
writer.keyword("STORED AS INPUTFORMAT");
fileFormat.get(0).unparse(writer, 0, 0);
writer.keyword("OUTPUTFORMAT");
fileFormat.get(1).unparse(writer, 0, 0);
writer.newlineAndIndent();
nimesh1601 marked this conversation as resolved.
Show resolved Hide resolved
}
}
if (selectQuery != null) {
writer.keyword("AS");
writer.newlineAndIndent();
selectQuery.unparse(writer, 0, 0);
}
}

@Override
public SqlNode getSelectQuery() {
return selectQuery;
}

@Override
public void setSelectQuery(SqlNode query) {
this.selectQuery = query;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -302,12 +302,93 @@ protected R visit(ASTNode node, C ctx) {
case HiveParser.KW_CURRENT:
return visitCurrentRow(node, ctx);

case HiveParser.TOK_CREATETABLE:
return visitCreateTable(node, ctx);
case HiveParser.TOK_LIKETABLE:
return visitLikeTable(node, ctx);
case HiveParser.TOK_IFNOTEXISTS:
return visitIfNotExists(node, ctx);
case HiveParser.TOK_TABCOLLIST:
return visitColumnList(node, ctx);
case HiveParser.TOK_TABCOL:
return visitColumn(node, ctx);
case HiveParser.TOK_FILEFORMAT_GENERIC:
return visitFileFormatGeneric(node, ctx);
case HiveParser.TOK_TABLEFILEFORMAT:
return visitTableFileFormat(node, ctx);
case HiveParser.TOK_TABLESERIALIZER:
return visitTableSerializer(node, ctx);
case HiveParser.TOK_SERDENAME:
return visitSerdeName(node, ctx);
case HiveParser.TOK_TABLEROWFORMAT:
return visitTableRowFormat(node, ctx);
case HiveParser.TOK_SERDEPROPS:
return visitSerdeProps(node, ctx);
case HiveParser.TOK_TABLEROWFORMATFIELD:
return visitTableRowFormatField(node, ctx);
default:
// return visitChildren(node, ctx);
throw new UnhandledASTTokenException(node);
}
}

protected R visitTableRowFormatField(ASTNode node, C ctx) {
return visitChildren(node, ctx).get(0);
}

protected R visitSerdeProps(ASTNode node, C ctx) {
return visitChildren(node, ctx).get(0);
}

protected R visitTableRowFormat(ASTNode node, C ctx) {
return visitChildren(node, ctx).get(0);
}

protected R visitSerdeName(ASTNode node, C ctx) {
return visitChildren(node, ctx).get(0);
}

protected R visitTableSerializer(ASTNode node, C ctx) {
return visitChildren(node, ctx).get(0);
}

protected R visitTableFileFormat(ASTNode node, C ctx) {
return visitChildren(node, ctx).get(0);
}

protected R visitFileFormatGeneric(ASTNode node, C ctx) {
return visitChildren(node, ctx).get(0);
}

protected R visitColumn(ASTNode node, C ctx) {
if(node.getChildren() != null){
return visitChildren(node, ctx).get(0);
}
return null;
}
nimesh1601 marked this conversation as resolved.
Show resolved Hide resolved

protected R visitColumnList(ASTNode node, C ctx) {
return visitChildren(node, ctx).get(0);
}

protected R visitIfNotExists(ASTNode node, C ctx) {
if(node.getChildren() != null){
return visitChildren(node, ctx).get(0);
}
return null;
}

protected R visitLikeTable(ASTNode node, C ctx){
if(node.getChildren() != null){
return visitChildren(node, ctx).get(0);
}
return null;
}

protected R visitCreateTable(ASTNode node, C ctx){
return visitChildren(node, ctx).get(0);
}

protected R visitKeywordLiteral(ASTNode node, C ctx) {
return visitChildren(node, ctx).get(0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Arrays;
import java.util.stream.Collectors;

import javax.annotation.Nullable;

import com.linkedin.coral.common.calcite.sql.SqlCreateTable;
import org.apache.calcite.avatica.util.TimeUnit;
import org.apache.calcite.sql.JoinConditionType;
import org.apache.calcite.sql.JoinType;
Expand All @@ -39,6 +41,7 @@
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.sql.SqlCharStringLiteral;
import org.apache.hadoop.hive.metastore.api.Table;

import com.linkedin.coral.com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -608,6 +611,96 @@ protected SqlNode visitSelect(ASTNode node, ParseContext ctx) {
return ctx.selects;
}

@Override
protected SqlNode visitCreateTable(ASTNode node, ParseContext ctx){
CreateTableOptions ctOptions = new CreateTableOptions();
for(Node child: node.getChildren()){
ASTNode ast = (ASTNode) child;
switch (ast.getType()){
case HiveParser.TOK_TABNAME:
ctOptions.name = (SqlIdentifier) visitTabnameNode(ast, ctx);
break;
case HiveParser.TOK_IFNOTEXISTS:
ctOptions.ifNotExists = true;
break;
case HiveParser.TOK_TABCOLLIST:
ctOptions.columnList = (SqlNodeList) visitColumnList(ast, ctx);
break;
case HiveParser.TOK_QUERY:
ctOptions.query = visitQueryNode(ast, ctx);
break;
case HiveParser.TOK_TABLESERIALIZER:
ctOptions.tableSerializer = visitTableSerializer(ast, ctx);
break;
case HiveParser.TOK_TABLEFILEFORMAT:
ctOptions.tableFileFormat = (SqlNodeList) visitTableFileFormat(ast, ctx);
break;
case HiveParser.TOK_FILEFORMAT_GENERIC:
ctOptions.tableFileFormat = (SqlNodeList) visitFileFormatGeneric(ast, ctx);
break;
case HiveParser.TOK_TABLEROWFORMAT:
ctOptions.tableRowFormat = (SqlCharStringLiteral) visitTableRowFormat(ast, ctx);
break;
default:
break;
}
}
return new SqlCreateTable(ZERO, false, ctOptions.ifNotExists != null ? ctOptions.ifNotExists : false, ctOptions.name,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we introduce a utility class like calcite does here https://github.com/apache/calcite/blob/main/core/src/main/java/org/apache/calcite/sql/ddl/SqlDdlNodes.java to manage all ddl sqlNode creation?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aastha25 Why does DDL SqlNode creation warrant a separate path? Why cannot it be done through generic SqlNode generation?

ctOptions.columnList, ctOptions.query, ctOptions.tableSerializer, ctOptions.tableFileFormat, ctOptions.tableRowFormat);
}

@Override
protected SqlNode visitColumnList(ASTNode node, ParseContext ctx){
List<SqlNode> sqlNodeList = visitChildren(node, ctx);
return new SqlNodeList(sqlNodeList, ZERO);
}

@Override
protected SqlNode visitColumn(ASTNode node, ParseContext ctx) {
return visitChildren(node, ctx).get(0);
}

@Override
protected SqlNode visitIfNotExists(ASTNode node, ParseContext ctx) {
return SqlLiteral.createBoolean(true, ZERO);
}

@Override
protected SqlNode visitTableRowFormat(ASTNode node, ParseContext ctx){
return visitChildren(node, ctx).get(0);
}

@Override
protected SqlNode visitSerdeName(ASTNode node, ParseContext ctx) {
return visit((ASTNode) node.getChildren().get(0), ctx);
}

@Override
protected SqlNode visitTableSerializer(ASTNode node, ParseContext ctx) {
return visitChildren(node, ctx).get(0);
}

@Override
protected SqlNode visitTableFileFormat(ASTNode node, ParseContext ctx) {
List<SqlNode> sqlNodeList = visitChildren(node, ctx);
return new SqlNodeList(sqlNodeList, ZERO);
}

@Override
protected SqlNode visitFileFormatGeneric(ASTNode node, ParseContext ctx) {
return new SqlNodeList(Arrays.asList(visitChildren(node, ctx).get(0)), ZERO);
}

@Override
protected SqlNode visitSerdeProps(ASTNode node, ParseContext ctx) {
return visitChildren(node, ctx).get(0);
}

@Override
protected SqlNode visitTableRowFormatField(ASTNode node, ParseContext ctx) {
return visitChildren(node, ctx).get(0);
}

@Override
protected SqlNode visitTabRefNode(ASTNode node, ParseContext ctx) {
List<SqlNode> sqlNodes = visitChildren(node, ctx);
Expand Down Expand Up @@ -1059,4 +1152,14 @@ Optional<Table> getHiveTable() {
return hiveTable;
}
}
}

class CreateTableOptions {
nimesh1601 marked this conversation as resolved.
Show resolved Hide resolved
SqlIdentifier name;
SqlNodeList columnList;
SqlNode query;
Boolean ifNotExists;
SqlNode tableSerializer;
SqlNodeList tableFileFormat;
SqlCharStringLiteral tableRowFormat;
}
nimesh1601 marked this conversation as resolved.
Show resolved Hide resolved
}
Loading