From cdbfc93a7bd75a319079387b4f15762b82f85c32 Mon Sep 17 00:00:00 2001 From: Ryanne Dolan Date: Wed, 27 Mar 2024 14:04:06 -0500 Subject: [PATCH] Drop explicit projection from pipelines --- .../linkedin/hoptimator/catalog/ScriptImplementor.java | 10 ++++++++-- .../linkedin/hoptimator/planner/HoptimatorPlanner.java | 4 ++-- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/ScriptImplementor.java b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/ScriptImplementor.java index 2a16543..b847e8c 100644 --- a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/ScriptImplementor.java +++ b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/ScriptImplementor.java @@ -137,7 +137,13 @@ public void implement(SqlWriter w) { RelToSqlConverter converter = new RelToSqlConverter(w.getDialect()); SqlImplementor.Result result = converter.visitRoot(relNode); SqlSelect select = result.asSelect(); - if (select.getSelectList() != null) { + // FIXME: workaround for Flink SQL limitation fixed in 1.18 + // N.B. Flink SQL previous to 1.18 does not support `ROW(...)` with + // arbitrary expressions inside. This means Flink SQL might choke on + // some expressions anytime there is a `ROW`. So we replace `ROW()` + // with the "implicit" row constructor `()`. However, the implicit + // row constructor only works for lists greater than length 1! + if (select.getSelectList() != null && select.getSelectList().size() > 1) { select.setSelectList((SqlNodeList) select.getSelectList().accept(REMOVE_ROW_CONSTRUCTOR)); } w.literal(select.toSqlString(w.getDialect()).getSql()); @@ -146,7 +152,7 @@ public void implement(SqlWriter w) { // A `ROW(...)` operator which will unparse as just `(...)`. private final SqlRowOperator SILENT_COLUMN_LIST = new SqlRowOperator(""); // empty string name - // a shuttle that replaces `Row(...)` with just `(...)` + // a shuttle that replaces `ROW(...)` with just `(...)` private final SqlShuttle REMOVE_ROW_CONSTRUCTOR = new SqlShuttle() { @Override public SqlNode visit(SqlCall call) { diff --git a/hoptimator-planner/src/main/java/com/linkedin/hoptimator/planner/HoptimatorPlanner.java b/hoptimator-planner/src/main/java/com/linkedin/hoptimator/planner/HoptimatorPlanner.java index 9437a57..9dd1139 100644 --- a/hoptimator-planner/src/main/java/com/linkedin/hoptimator/planner/HoptimatorPlanner.java +++ b/hoptimator-planner/src/main/java/com/linkedin/hoptimator/planner/HoptimatorPlanner.java @@ -92,7 +92,7 @@ public PipelineRel pipeline(String sql) throws Exception { Planner planner = Frameworks.getPlanner(calciteFrameworkConfig); SqlNode parsed = planner.parse(sql); SqlNode validated = planner.validate(parsed); - RelNode logicalPlan = planner.rel(validated).project(); + RelNode logicalPlan = planner.rel(validated).rel;//.project(); RelTraitSet traitSet = logicalPlan.getTraitSet(); traitSet = traitSet.simplify(); PipelineRel pipelineRel = (PipelineRel) planner.transform(0, traitSet.replace(PipelineRel.CONVENTION), logicalPlan); @@ -104,7 +104,7 @@ public RelNode logical(String sql) throws Exception { Planner planner = Frameworks.getPlanner(calciteFrameworkConfig); SqlNode parsed = planner.parse(sql); SqlNode validated = planner.validate(parsed); - RelNode logicalPlan = planner.rel(validated).project(); + RelNode logicalPlan = planner.rel(validated).rel;//.project(); RelTraitSet traitSet = logicalPlan.getTraitSet(); planner.close(); return logicalPlan;