Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BugFix] Fix random table broker load fails when table has schema change #53041

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 29 additions & 5 deletions be/src/exec/tablet_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ void OlapTableIndexSchema::to_protobuf(POlapTableIndexSchema* pindex) const {
pindex->set_id(index_id);
pindex->set_schema_hash(schema_hash);
pindex->set_schema_id(schema_id);
pindex->set_is_shadow(is_shadow);
for (auto slot : slots) {
pindex->add_columns(slot->col_name());
}
Expand Down Expand Up @@ -128,6 +129,13 @@ Status OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema) {
index->column_to_expr_value.insert({entry.first, entry.second});
}

if (p_index.has_is_shadow()) {
index->is_shadow = p_index.is_shadow();
if (index->is_shadow) {
_shadow_indexes++;
}
}

_indexes.emplace_back(index);
}

Expand Down Expand Up @@ -187,6 +195,12 @@ Status OlapTableSchemaParam::init(const TOlapTableSchemaParam& tschema, RuntimeS
index->column_to_expr_value.insert({entry.first, entry.second});
}
}
if (t_index.__isset.is_shadow) {
index->is_shadow = t_index.is_shadow;
if (index->is_shadow) {
_shadow_indexes++;
}
}
_indexes.emplace_back(index);
}

Expand Down Expand Up @@ -477,10 +491,11 @@ Status OlapTablePartitionParam::add_partitions(const std::vector<TOlapTableParti

part->num_buckets = t_part.num_buckets;
auto num_indexes = _schema->indexes().size();
if (t_part.indexes.size() != num_indexes) {
if (t_part.indexes.size() != num_indexes - _schema->shadow_index_size()) {
std::stringstream ss;
ss << "number of partition's index is not equal with schema's"
<< ", num_part_indexes=" << t_part.indexes.size() << ", num_schema_indexes=" << num_indexes;
<< ", num_part_indexes=" << t_part.indexes.size() << ", num_schema_indexes=" << num_indexes
<< ", num_shadow_indexes=" << _schema->shadow_index_size();
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str());
}
Expand All @@ -490,16 +505,25 @@ Status OlapTablePartitionParam::add_partitions(const std::vector<TOlapTableParti
return lhs.index_id < rhs.index_id;
});
// check index
for (int j = 0; j < num_indexes; ++j) {
if (part->indexes[j].index_id != _schema->indexes()[j]->index_id) {
// If an add_partition operation is executed during the ALTER process, the ALTER operation will be canceled first.
// Therefore, the latest indexes will not include shadow indexes.
// However, the schema's index may still contain shadow indexes, so these shadow indexes need to be ignored.
int j = 0;
for (int i = 0; i < num_indexes; ++i) {
if (_schema->indexes()[i]->is_shadow) {
continue;
}
if (part->indexes[j].index_id != _schema->indexes()[i]->index_id) {
std::stringstream ss;
ss << "partition's index is not equal with schema's"
<< ", part_index=" << part->indexes[j].index_id
<< ", schema_index=" << _schema->indexes()[j]->index_id;
<< ", schema_index=" << _schema->indexes()[i]->index_id;
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str());
}
j++;
}

_partitions.emplace(part->id, part);
if (t_part.__isset.in_keys) {
for (auto& in_key : part->in_keys) {
sevev marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
4 changes: 4 additions & 0 deletions be/src/exec/tablet_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ struct OlapTableIndexSchema {
OlapTableColumnParam* column_param;
ExprContext* where_clause = nullptr;
std::map<std::string, std::string> column_to_expr_value;
bool is_shadow = false;

void to_protobuf(POlapTableIndexSchema* pindex) const;
};
Expand Down Expand Up @@ -81,6 +82,7 @@ class OlapTableSchemaParam {
return _proto_schema;
}

int64_t shadow_index_size() const { return _shadow_indexes; }
std::string debug_string() const;

private:
Expand All @@ -92,6 +94,8 @@ class OlapTableSchemaParam {
mutable POlapTableSchemaParam* _proto_schema = nullptr;
std::vector<OlapTableIndexSchema*> _indexes;
mutable ObjectPool _obj_pool;

int64_t _shadow_indexes = 0;
};

using OlapTableIndexTablets = TOlapTableIndexTablets;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@ public static TOlapTableSchemaParam createSchema(long dbId, OlapTable table, Tup
.stream()
.map(column -> column.isShadowColumn() ? column.getName() : column.getColumnId().getId())
.collect(Collectors.toList()));
boolean isShadow = indexMeta.getSchema().stream().anyMatch(column -> column.isShadowColumn());
for (Column column : indexMeta.getSchema()) {
TColumn tColumn = column.toThrift();
tColumn.setColumn_name(column.getColumnId().getId());
Expand All @@ -400,6 +401,7 @@ public static TOlapTableSchemaParam createSchema(long dbId, OlapTable table, Tup
indexSchema.setColumn_param(columnParam);
indexSchema.setSchema_id(indexMeta.getSchemaId());
indexSchema.setColumn_to_expr_value(columnToExprValue);
indexSchema.setIs_shadow(isShadow);
schemaParam.addToIndexes(indexSchema);
if (indexMeta.getWhereClause() != null) {
Database db = GlobalStateMgr.getCurrentState().getLocalMetastore().getDb(dbId);
Expand Down
1 change: 1 addition & 0 deletions gensrc/proto/descriptors.proto
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ message POlapTableIndexSchema {
optional POlapTableColumnParam column_param = 4;
optional int64 schema_id = 5;
map<string, string> column_to_expr_value = 6;
optional bool is_shadow = 7;
};

message POlapTableSchemaParam {
Expand Down
1 change: 1 addition & 0 deletions gensrc/thrift/Descriptors.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ struct TOlapTableIndexSchema {
5: optional Exprs.TExpr where_clause
6: optional i64 schema_id // schema id
7: optional map<string, string> column_to_expr_value
8: optional bool is_shadow
}

struct TOlapTableSchemaParam {
Expand Down
Loading