Skip to content

Commit

Permalink
Drop explicit projection from pipelines
Browse files Browse the repository at this point in the history
  • Loading branch information
ryannedolan committed Mar 27, 2024
1 parent e1ad4b5 commit cdbfc93
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,13 @@ public void implement(SqlWriter w) {
RelToSqlConverter converter = new RelToSqlConverter(w.getDialect());
SqlImplementor.Result result = converter.visitRoot(relNode);
SqlSelect select = result.asSelect();
if (select.getSelectList() != null) {
// FIXME: workaround for Flink SQL limitation fixed in 1.18
// N.B. Flink SQL previous to 1.18 does not support `ROW(...)` with
// arbitrary expressions inside. This means Flink SQL might choke on
// some expressions anytime there is a `ROW`. So we replace `ROW()`
// with the "implicit" row constructor `()`. However, the implicit
// row constructor only works for lists greater than length 1!
if (select.getSelectList() != null && select.getSelectList().size() > 1) {
select.setSelectList((SqlNodeList) select.getSelectList().accept(REMOVE_ROW_CONSTRUCTOR));
}
w.literal(select.toSqlString(w.getDialect()).getSql());
Expand All @@ -146,7 +152,7 @@ public void implement(SqlWriter w) {
// A `ROW(...)` operator which will unparse as just `(...)`.
private final SqlRowOperator SILENT_COLUMN_LIST = new SqlRowOperator(""); // empty string name

// a shuttle that replaces `Row(...)` with just `(...)`
// a shuttle that replaces `ROW(...)` with just `(...)`
private final SqlShuttle REMOVE_ROW_CONSTRUCTOR = new SqlShuttle() {
@Override
public SqlNode visit(SqlCall call) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public PipelineRel pipeline(String sql) throws Exception {
Planner planner = Frameworks.getPlanner(calciteFrameworkConfig);
SqlNode parsed = planner.parse(sql);
SqlNode validated = planner.validate(parsed);
RelNode logicalPlan = planner.rel(validated).project();
RelNode logicalPlan = planner.rel(validated).rel;//.project();
RelTraitSet traitSet = logicalPlan.getTraitSet();
traitSet = traitSet.simplify();
PipelineRel pipelineRel = (PipelineRel) planner.transform(0, traitSet.replace(PipelineRel.CONVENTION), logicalPlan);
Expand All @@ -104,7 +104,7 @@ public RelNode logical(String sql) throws Exception {
Planner planner = Frameworks.getPlanner(calciteFrameworkConfig);
SqlNode parsed = planner.parse(sql);
SqlNode validated = planner.validate(parsed);
RelNode logicalPlan = planner.rel(validated).project();
RelNode logicalPlan = planner.rel(validated).rel;//.project();
RelTraitSet traitSet = logicalPlan.getTraitSet();
planner.close();
return logicalPlan;
Expand Down

0 comments on commit cdbfc93

Please sign in to comment.