Skip to content

Commit

Permalink
Add support for pushing down complex filters to scan
Browse files Browse the repository at this point in the history
    - Push down complex filters to the scan
    - Buffer the output of the scan so that only sufficiently
      fully buffers are passed on to other operators
  • Loading branch information
arhamchopra committed Dec 2, 2023
1 parent c634fd5 commit e7a3b43
Show file tree
Hide file tree
Showing 22 changed files with 284 additions and 21 deletions.
6 changes: 5 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -343,4 +343,8 @@ zig-cache/*
extension/extension_config_local.cmake

# extension_external dir
extension_external
extension_external

*.cmake
*Makefile
*CMakeFiles
1 change: 1 addition & 0 deletions src/common/enums/optimizer_type.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ static DefaultOptimizerType internal_optimizer_types[] = {
{"compressed_materialization", OptimizerType::COMPRESSED_MATERIALIZATION},
{"duplicate_groups", OptimizerType::DUPLICATE_GROUPS},
{"reorder_filter", OptimizerType::REORDER_FILTER},
{"heuristic_operator_fusion", OptimizerType::HEURISTIC_OPERATOR_FUSION},
{"extension", OptimizerType::EXTENSION},
{nullptr, OptimizerType::INVALID}};

Expand Down
6 changes: 6 additions & 0 deletions src/execution/column_binding_resolver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "duckdb/planner/operator/logical_any_join.hpp"
#include "duckdb/planner/operator/logical_create_index.hpp"
#include "duckdb/planner/operator/logical_insert.hpp"
#include "duckdb/planner/operator/logical_get.hpp"
#include "duckdb/planner/operator/logical_extension_operator.hpp"

#include "duckdb/planner/expression/bound_columnref_expression.hpp"
Expand Down Expand Up @@ -68,6 +69,11 @@ void ColumnBindingResolver::VisitOperator(LogicalOperator &op) {
//! We first need to update the current set of bindings and then visit operator expressions
bindings = op.GetColumnBindings();
VisitOperatorExpressions(op);
auto& logical_get = op.Cast<LogicalGet>();
auto& tf = logical_get.table_filters;
if (tf.complex_filter) {
VisitExpression(&tf.complex_filter);
}
return;
}
case LogicalOperatorType::LOGICAL_INSERT: {
Expand Down
2 changes: 2 additions & 0 deletions src/execution/expression_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "duckdb/execution/execution_context.hpp"
#include "duckdb/storage/statistics/base_statistics.hpp"
#include "duckdb/planner/expression/list.hpp"
#include <iostream>

namespace duckdb {

Expand Down Expand Up @@ -142,6 +143,7 @@ void ExpressionExecutor::Verify(const Expression &expr, Vector &vector, idx_t co

unique_ptr<ExpressionState> ExpressionExecutor::InitializeState(const Expression &expr,
ExpressionExecutorState &state) {

switch (expr.expression_class) {
case ExpressionClass::BOUND_REF:
return InitializeState(expr.Cast<BoundReferenceExpression>(), state);
Expand Down
5 changes: 5 additions & 0 deletions src/execution/operator/scan/physical_table_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,12 @@ string PhysicalTableScan::ParamsToString() const {
result += "\n";
}
}
if (table_filters->complex_filter) {
result += table_filters->complex_filter->ToString();
result += "\n";
}
}

if (!extra_info.file_filters.empty()) {
result += "\n[INFOSEPARATOR]\n";
result += "File Filters: " + extra_info.file_filters;
Expand Down
6 changes: 6 additions & 0 deletions src/execution/physical_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
#include "duckdb/parallel/thread_context.hpp"
#include "duckdb/storage/buffer_manager.hpp"


bool disable_caching = true;

namespace duckdb {

string PhysicalOperator::GetName() const {
Expand Down Expand Up @@ -252,6 +255,9 @@ OperatorResultType CachingPhysicalOperator::Execute(ExecutionContext &context, D

// Execute child operator
auto child_result = ExecuteInternal(context, input, chunk, gstate, state);
if (disable_caching) {
return child_result;
}

#if STANDARD_VECTOR_SIZE >= 128
if (!state.initialized) {
Expand Down
6 changes: 5 additions & 1 deletion src/execution/physical_plan/plan_get.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
#include "duckdb/planner/expression/bound_reference_expression.hpp"
#include "duckdb/planner/operator/logical_get.hpp"

#include <iostream>

namespace duckdb {

unique_ptr<TableFilterSet> CreateTableFilterSet(TableFilterSet &table_filters, vector<column_t> &column_ids) {
Expand All @@ -26,6 +28,8 @@ unique_ptr<TableFilterSet> CreateTableFilterSet(TableFilterSet &table_filters, v
}
table_filter_set->filters[column_index] = std::move(table_filter.second);
}
table_filter_set->complex_filter = std::move(table_filters.complex_filter);
table_filter_set->used_col_ids = std::move(table_filters.used_col_ids);
return table_filter_set;
}

Expand All @@ -43,7 +47,7 @@ unique_ptr<PhysicalOperator> PhysicalPlanGenerator::CreatePlan(LogicalGet &op) {
}

unique_ptr<TableFilterSet> table_filters;
if (!op.table_filters.filters.empty()) {
if (!op.table_filters.filters.empty() || op.table_filters.complex_filter) {
table_filters = CreateTableFilterSet(op.table_filters, op.column_ids);
}

Expand Down
1 change: 1 addition & 0 deletions src/include/duckdb/common/enums/optimizer_type.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ enum class OptimizerType : uint32_t {
COMPRESSED_MATERIALIZATION,
DUPLICATE_GROUPS,
REORDER_FILTER,
HEURISTIC_OPERATOR_FUSION,
EXTENSION
};

Expand Down
6 changes: 5 additions & 1 deletion src/include/duckdb/optimizer/filter_combiner.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ enum class FilterResult { UNSATISFIABLE, SUCCESS, UNSUPPORTED };
//! (1) it prunes obsolete filter conditions: i.e. [X > 5 and X > 7] => [X > 7]
//! (2) it generates new filters for expressions in the same equivalence set: i.e. [X = Y and X = 500] => [Y = 500]
//! (3) it prunes branches that have unsatisfiable filters: i.e. [X = 5 AND X > 6] => FALSE, prune branch
class FilterCombiner {
class FilterCombiner : private LogicalOperatorVisitor {
public:
explicit FilterCombiner(ClientContext &context);
explicit FilterCombiner(Optimizer &optimizer);
Expand Down Expand Up @@ -94,8 +94,12 @@ class FilterCombiner {
// }

private:
unique_ptr<Expression> VisitReplace(BoundColumnRefExpression &ref, unique_ptr<Expression> *expr_ptr) override;

vector<unique_ptr<Expression>> remaining_filters;

unordered_set<idx_t> referenced_col_ids;

expression_map_t<unique_ptr<Expression>> stored_expressions;
expression_map_t<idx_t> equivalence_set_map;
unordered_map<idx_t, vector<ExpressionValueInformation>> constant_values;
Expand Down
13 changes: 13 additions & 0 deletions src/include/duckdb/optimizer/heuristic_operator_fusion.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#pragma once

#include "duckdb.hpp"
#include "duckdb/optimizer/optimizer_extension.hpp"

namespace duckdb {

class HeuristicOperatorFusion {
public:
HeuristicOperatorFusion() { }
duckdb::unique_ptr<LogicalOperator> Rewrite(duckdb::unique_ptr<LogicalOperator> &plan);
};
}
14 changes: 14 additions & 0 deletions src/include/duckdb/planner/table_filter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@

#pragma once

#include <algorithm>

#include "duckdb/common/common.hpp"
#include "duckdb/common/types.hpp"
#include "duckdb/common/unordered_map.hpp"
#include "duckdb/common/enums/filter_propagate_result.hpp"
#include "duckdb/planner/expression.hpp"

namespace duckdb {
class BaseStatistics;
Expand Down Expand Up @@ -66,8 +69,19 @@ class TableFilter {
class TableFilterSet {
public:
unordered_map<idx_t, unique_ptr<TableFilter>> filters;
unique_ptr<Expression> complex_filter;
vector<bool> used_col_ids;

public:
TableFilterSet(): complex_filter(nullptr) { }
TableFilterSet(int no_cols): complex_filter(nullptr) { used_col_ids.resize(no_cols); }
void SetColumnsIds(vector<idx_t> col_ids) {
for(auto col_id: col_ids) {
D_ASSERT(used_col_ids.size() > col_id);
used_col_ids[col_id] = true;
}
}

void PushFilter(idx_t table_index, unique_ptr<TableFilter> filter);

bool Equals(TableFilterSet &other) {
Expand Down
2 changes: 2 additions & 0 deletions src/include/duckdb/storage/table/scan_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ class CollectionScanState {
//! The current batch index
idx_t batch_index;

DataChunk cached_data;

public:
void Initialize(const vector<LogicalType> &types);
const vector<storage_t> &GetColumnIds();
Expand Down
1 change: 1 addition & 0 deletions src/optimizer/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ add_library_unity(
filter_combiner.cpp
filter_pushdown.cpp
filter_pullup.cpp
heuristic_operator_fusion.cpp
in_clause_rewriter.cpp
optimizer.cpp
expression_rewriter.cpp
Expand Down
45 changes: 44 additions & 1 deletion src/optimizer/filter_combiner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "duckdb/planner/filter/constant_filter.hpp"
#include "duckdb/planner/filter/null_filter.hpp"
#include "duckdb/optimizer/optimizer.hpp"
#include <iostream>

namespace duckdb {

Expand Down Expand Up @@ -391,8 +392,19 @@ bool FilterCombiner::HasFilters() {
// return zonemap_checks;
// }

unique_ptr<Expression> FilterCombiner::VisitReplace(BoundColumnRefExpression &ref,
unique_ptr<Expression> *expr_ptr) {
referenced_col_ids.insert(ref.binding.column_index);
return std::move(*expr_ptr);
}

TableFilterSet FilterCombiner::GenerateTableScanFilters(vector<idx_t> &column_ids) {
TableFilterSet table_filters;
TableFilterSet table_filters(column_ids.size());
auto combined_filter = make_uniq<BoundConjunctionExpression>(ExpressionType::CONJUNCTION_AND);
// TODO: Improve filter combiner to support more types of comparisions
// TODO: Ask why there is not null check in the filter pushdown

referenced_col_ids.clear();
//! First, we figure the filters that have constant expressions that we can push down to the table scan
for (auto &constant_value : constant_values) {
if (!constant_value.second.empty()) {
Expand All @@ -407,6 +419,8 @@ TableFilterSet FilterCombiner::GenerateTableScanFilters(vector<idx_t> &column_id
constant_value.second[0].constant.type().InternalType() == PhysicalType::BOOL)) {
//! Here we check if these filters are column references
filter_exp = equivalence_map.find(constant_value.first);

// To handle single column comparisons
if (filter_exp->second.size() == 1 &&
filter_exp->second[0].get().type == ExpressionType::BOUND_COLUMN_REF) {
auto &filter_col_exp = filter_exp->second[0].get().Cast<BoundColumnRefExpression>();
Expand All @@ -421,6 +435,7 @@ TableFilterSet FilterCombiner::GenerateTableScanFilters(vector<idx_t> &column_id
for (idx_t i = 0; i < entries.size(); i++) {
// for each entry also create a comparison with each constant
for (idx_t k = 0; k < constant_list.size(); k++) {
std::cout << "Optimizer::SimpleFilterFusion:" << filter_col_exp.ToString() << " " << ExpressionTypeToString(constant_value.second[k].comparison_type) << " " << constant_value.second[k].constant.ToString() << std::endl;
auto constant_filter = make_uniq<ConstantFilter>(constant_value.second[k].comparison_type,
constant_value.second[k].constant);
table_filters.PushFilter(column_index, std::move(constant_filter));
Expand All @@ -429,9 +444,37 @@ TableFilterSet FilterCombiner::GenerateTableScanFilters(vector<idx_t> &column_id
}
equivalence_map.erase(filter_exp);
}
else if (filter_exp->second.size() == 1) {
// TODO: This might not work for all filters expressions so conditionally execute this code
auto equivalence_set = filter_exp->first;
auto &constant_list = constant_values.find(equivalence_set)->second;
auto expr = filter_exp->second[0].get().Copy();
VisitExpression(&expr);
for (idx_t k = 0; k < constant_list.size(); k++) {
auto rhs = make_uniq<BoundConstantExpression>(constant_value.second[k].constant)->Copy();
combined_filter->children.push_back(make_uniq<BoundComparisonExpression>(constant_value.second[k].comparison_type, expr->Copy(), std::move(rhs)));
}
equivalence_map.erase(filter_exp);
}
}
}
}
auto no_complex_filters = combined_filter->children.size();
if (no_complex_filters > 1) {
table_filters.complex_filter = std::move(combined_filter);
} else if (no_complex_filters == 1) {
table_filters.complex_filter = std::move(combined_filter->children[0]);
}
if(table_filters.complex_filter) {
std::cout << "Optimizer:ComplexFilterFusion:" << table_filters.complex_filter->ToString() << std::endl;
}

vector<idx_t> converted_ids;
for(auto col_id: referenced_col_ids) {
converted_ids.push_back(col_id);
}
table_filters.SetColumnsIds(converted_ids);

//! Here we look for LIKE or IN filters
for (idx_t rem_fil_idx = 0; rem_fil_idx < remaining_filters.size(); rem_fil_idx++) {
auto &remaining_filter = remaining_filters[rem_fil_idx];
Expand Down
25 changes: 23 additions & 2 deletions src/optimizer/filter_pushdown.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
#include "duckdb/planner/operator/logical_filter.hpp"
#include "duckdb/planner/operator/logical_join.hpp"
#include "duckdb/optimizer/optimizer.hpp"
#include <iostream>

bool disable_filter_pushdown = false;
bool disable_filter_pushdown_get = true;

namespace duckdb {

Expand All @@ -13,6 +17,18 @@ FilterPushdown::FilterPushdown(Optimizer &optimizer) : optimizer(optimizer), com
}

unique_ptr<LogicalOperator> FilterPushdown::Rewrite(unique_ptr<LogicalOperator> op) {
if (disable_filter_pushdown) {
std::cout << "Optimizer:FilterPushDown:disabled\n";
return std::move(op);
} else {
std::cout << "Optimizer:FilterPushDown:enabled\n";
}
if (disable_filter_pushdown_get) {
std::cout << "Optimizer:FilterFusion:disabled\n";
} else {
std::cout << "Optimizer:FilterFusion:enabled\n";
}

D_ASSERT(!combiner.HasFilters());
switch (op->type) {
case LogicalOperatorType::LOGICAL_AGGREGATE_AND_GROUP_BY:
Expand All @@ -39,8 +55,13 @@ unique_ptr<LogicalOperator> FilterPushdown::Rewrite(unique_ptr<LogicalOperator>
op->children[0] = Rewrite(std::move(op->children[0]));
return op;
}
case LogicalOperatorType::LOGICAL_GET:
return PushdownGet(std::move(op));
case LogicalOperatorType::LOGICAL_GET: {
if (disable_filter_pushdown_get) {
return FinishPushdown(std::move(op));
} else {
return PushdownGet(std::move(op));
}
}
case LogicalOperatorType::LOGICAL_LIMIT:
return PushdownLimit(std::move(op));
default:
Expand Down
18 changes: 18 additions & 0 deletions src/optimizer/heuristic_operator_fusion.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#include "duckdb/optimizer/heuristic_operator_fusion.hpp"

#include "duckdb.hpp"
#include "duckdb/common/types/column/column_data_collection.hpp"
#include "duckdb/optimizer/optimizer_extension.hpp"
#include "duckdb/planner/operator/logical_column_data_get.hpp"
#include "duckdb/planner/operator/logical_get.hpp"
#include "duckdb/common/serializer/binary_serializer.hpp"
#include "duckdb/common/serializer/binary_deserializer.hpp"
#include "duckdb/common/serializer/memory_stream.hpp"
#include <iostream>

using namespace duckdb;

duckdb::unique_ptr<LogicalOperator> HeuristicOperatorFusion::Rewrite(duckdb::unique_ptr<LogicalOperator> &plan) {
// std::cout << "Called FuseOperators on plan\n" << plan->ToString() << std::endl;
return std::move(plan);
}
11 changes: 11 additions & 0 deletions src/optimizer/optimizer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "duckdb/optimizer/statistics_propagator.hpp"
#include "duckdb/optimizer/topn_optimizer.hpp"
#include "duckdb/optimizer/unnest_rewriter.hpp"
#include "duckdb/optimizer/heuristic_operator_fusion.hpp"
#include "duckdb/planner/binder.hpp"
#include "duckdb/planner/planner.hpp"

Expand Down Expand Up @@ -91,6 +92,11 @@ unique_ptr<LogicalOperator> Optimizer::Optimize(unique_ptr<LogicalOperator> plan
// this does not change the logical plan structure, but only simplifies the expression trees
RunOptimizer(OptimizerType::EXPRESSION_REWRITER, [&]() { rewriter.VisitOperator(*plan); });

RunOptimizer(OptimizerType::HEURISTIC_OPERATOR_FUSION, [&]() {
HeuristicOperatorFusion rewriter;
plan = rewriter.Rewrite(plan);
});

// perform filter pullup
RunOptimizer(OptimizerType::FILTER_PULLUP, [&]() {
FilterPullup filter_pullup;
Expand Down Expand Up @@ -194,6 +200,11 @@ unique_ptr<LogicalOperator> Optimizer::Optimize(unique_ptr<LogicalOperator> plan
plan = expression_heuristics.Rewrite(std::move(plan));
});

RunOptimizer(OptimizerType::HEURISTIC_OPERATOR_FUSION, [&]() {
HeuristicOperatorFusion rewriter;
plan = rewriter.Rewrite(plan);
});

for (auto &optimizer_extension : DBConfig::GetConfig(context).optimizer_extensions) {
RunOptimizer(OptimizerType::EXTENSION, [&]() {
optimizer_extension.optimize_function(context, optimizer_extension.optimizer_info.get(), plan);
Expand Down
1 change: 1 addition & 0 deletions src/optimizer/pushdown/pushdown_get.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "duckdb/planner/expression/bound_parameter_expression.hpp"
#include "duckdb/planner/operator/logical_filter.hpp"
#include "duckdb/planner/operator/logical_get.hpp"
#include <iostream>

namespace duckdb {

Expand Down
Loading

0 comments on commit e7a3b43

Please sign in to comment.