Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Metadata]Replace table_info.table_schema with arrow kind schema (Backward Compatibility) #354

Merged
merged 1 commit into from
Oct 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ public TableInfo selectByTableId(String tableId) {
String sql = String.format("select * from table_info where table_id = '%s'", tableId);
return getTableInfo(sql);
}
public List<TableInfo> selectByNamespace(String namespace){

public List<TableInfo> selectByNamespace(String namespace) {
String sql = String.format("select * from table_info where table_namespace='%s'", namespace);
return getTableInfos(sql);
}
Expand Down Expand Up @@ -105,6 +106,7 @@ private TableInfo getTableInfo(String sql) {
}
return tableInfo;
}

private List<TableInfo> getTableInfos(String sql) {
Connection conn = null;
PreparedStatement pstmt = null;
Expand Down Expand Up @@ -284,4 +286,12 @@ public static TableInfo tableInfoFromResultSet(ResultSet rs) throws SQLException
.setDomain(rs.getString("domain"))
.build();
}

public static boolean isArrowKindSchema(String schema) {
return schema.charAt(schema.indexOf('"') + 1) == 'f';
}

public static boolean isSparkKindSchema(String schema) {
return schema.charAt(schema.indexOf('"') + 1) == 't';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public void dropDatabase(String databaseName, boolean ignoreIfNotExists, boolean
List<String> tables = listTables(databaseName);
if (!tables.isEmpty()) {
if (cascade) {
for (String table: tables) {
for (String table : tables) {
try {
dropTable(new ObjectPath(databaseName, table), true);
} catch (TableNotExistException e) {
Expand Down Expand Up @@ -207,7 +207,7 @@ public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
dbManager.deleteShortTableName(tableInfo.getTableName(), tableName, tablePath.getDatabaseName());
dbManager.deleteDataCommitInfo(tableId);
dbManager.deletePartitionInfoByTableId(tableId);
if(FlinkUtil.isTable(tableInfo)){
if (FlinkUtil.isTable(tableInfo)) {
Path path = new Path(tableInfo.getTablePath());
try {
path.getFileSystem().delete(path, true);
Expand Down Expand Up @@ -274,7 +274,7 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
}
String tableId = TABLE_ID_PREFIX + UUID.randomUUID();
String qualifiedPath = "";
String sparkSchema = FlinkUtil.toSparkSchema(schema, cdcColumn).json();
String sparkSchema = FlinkUtil.toArrowSchema(schema, cdcColumn).toJson();
List<String> partitionKeys = Collections.emptyList();
if (table instanceof ResolvedCatalogTable) {
partitionKeys = ((ResolvedCatalogTable) table).getPartitionKeys();
Expand All @@ -284,7 +284,7 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
} else {
String flinkWarehouseDir = GlobalConfiguration.loadConfiguration().get(FLINK_WAREHOUSE_DIR);
if (null != flinkWarehouseDir) {
path = String.join("/", flinkWarehouseDir, tablePath.getDatabaseName(), tablePath.getObjectName());
path = String.join("/", flinkWarehouseDir, tablePath.getDatabaseName(), tablePath.getObjectName());
}
}
try {
Expand All @@ -298,9 +298,9 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
}
if (table instanceof ResolvedCatalogView) {
tableOptions.put(LAKESOUL_VIEW.key(), "true");
tableOptions.put(LAKESOUL_VIEW_TYPE.key(),LAKESOUL_VIEW_TYPE.defaultValue());
tableOptions.put(VIEW_ORIGINAL_QUERY,((ResolvedCatalogView) table).getOriginalQuery());
tableOptions.put(VIEW_EXPANDED_QUERY,((ResolvedCatalogView) table).getExpandedQuery());
tableOptions.put(LAKESOUL_VIEW_TYPE.key(), LAKESOUL_VIEW_TYPE.defaultValue());
tableOptions.put(VIEW_ORIGINAL_QUERY, ((ResolvedCatalogView) table).getOriginalQuery());
tableOptions.put(VIEW_EXPANDED_QUERY, ((ResolvedCatalogView) table).getExpandedQuery());
}
String json = JSON.toJSONString(tableOptions);
JSONObject properties = JSON.parseObject(json);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
import com.dmetasoul.lakesoul.meta.DBManager;
import com.dmetasoul.lakesoul.meta.DBConfig;
import com.dmetasoul.lakesoul.meta.DBUtil;
import com.dmetasoul.lakesoul.meta.dao.TableInfoDao;
import com.dmetasoul.lakesoul.meta.entity.TableInfo;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.flink.api.connector.sink.GlobalCommitter;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
Expand All @@ -19,6 +21,7 @@
import org.apache.flink.lakesoul.sink.writer.AbstractLakeSoulMultiTableSinkWriter;
import org.apache.flink.lakesoul.tool.FlinkUtil;
import org.apache.flink.lakesoul.types.TableSchemaIdentity;
import org.apache.spark.sql.arrow.ArrowUtils;
import org.apache.spark.sql.arrow.DataTypeCastUtils;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
Expand Down Expand Up @@ -116,9 +119,11 @@ public List<LakeSoulMultiTableSinkGlobalCommittable> commit(
String tableName = identity.tableId.table();
String tableNamespace = identity.tableId.schema();
boolean isCdc = identity.useCDC;
StructType msgSchema = FlinkUtil.toSparkSchema(identity.rowType, isCdc ? Optional.of(
Schema msgSchema = FlinkUtil.toArrowSchema(identity.rowType, isCdc ? Optional.of(
identity.cdcColumn) :
Optional.empty());
StructType sparkSchema = ArrowUtils.fromArrowSchema(msgSchema);

TableInfo tableInfo = dbManager.getTableInfoByNameAndNamespace(tableName, tableNamespace);
LOG.info("Committing: {}, {}, {}, {} {}", tableNamespace, tableName, isCdc, msgSchema, tableInfo);
if (tableInfo == null) {
Expand All @@ -137,7 +142,7 @@ public List<LakeSoulMultiTableSinkGlobalCommittable> commit(
properties.put(CDC_CHANGE_COLUMN, CDC_CHANGE_COLUMN_DEFAULT);
}
}
dbManager.createNewTable(tableId, tableNamespace, tableName, identity.tableLocation, msgSchema.json(),
dbManager.createNewTable(tableId, tableNamespace, tableName, identity.tableLocation, msgSchema.toJson(),
properties, partition);
} else {
DBUtil.TablePartitionKeys partitionKeys = DBUtil.parseTableInfoPartitions(tableInfo.getPartitions());
Expand All @@ -149,11 +154,17 @@ public List<LakeSoulMultiTableSinkGlobalCommittable> commit(
!new HashSet<>(partitionKeys.rangeKeys).containsAll(identity.partitionKeyList)) {
throw new IOException("Change of partition key column of table " + tableName + " is forbidden");
}
StructType origSchema = (StructType) StructType.fromJson(tableInfo.getTableSchema());
StructType origSchema = null;
if (TableInfoDao.isArrowKindSchema(tableInfo.getTableSchema())) {
Schema arrowSchema = Schema.fromJSON(tableInfo.getTableSchema());
origSchema = ArrowUtils.fromArrowSchema(arrowSchema);
} else {
origSchema = (StructType) StructType.fromJson(tableInfo.getTableSchema());
}
scala.Tuple3<String, Object, StructType>
equalOrCanCastTuple3 =
DataTypeCastUtils.checkSchemaEqualOrCanCast(origSchema,
msgSchema,
ArrowUtils.fromArrowSchema(msgSchema),
identity.partitionKeyList,
identity.primaryKeys);
String equalOrCanCast = equalOrCanCastTuple3._1();
Expand All @@ -162,17 +173,17 @@ public List<LakeSoulMultiTableSinkGlobalCommittable> commit(
if (equalOrCanCast.equals(DataTypeCastUtils.CAN_CAST())) {
LOG.warn("Schema change found, origin schema = {}, changed schema = {}",
origSchema.json(),
msgSchema.json());
msgSchema.toJson());
if (logicallyDropColumn) {
List<String> droppedColumn = DataTypeCastUtils.getDroppedColumn(origSchema, msgSchema);
List<String> droppedColumn = DataTypeCastUtils.getDroppedColumn(origSchema, sparkSchema);
if (droppedColumn.size() > 0) {
LOG.warn("Dropping Column {} Logically", droppedColumn);
dbManager.logicallyDropColumn(tableInfo.getTableId(), droppedColumn);
if (schemaChanged) {
dbManager.updateTableSchema(tableInfo.getTableId(), mergeStructType.json());
}
} else {
dbManager.updateTableSchema(tableInfo.getTableId(), msgSchema.json());
dbManager.updateTableSchema(tableInfo.getTableId(), msgSchema.toJson());
}
} else {
LOG.info("Changing table schema: {}, {}, {}, {}, {}, {}",
Expand All @@ -182,7 +193,7 @@ public List<LakeSoulMultiTableSinkGlobalCommittable> commit(
msgSchema,
identity.useCDC,
identity.cdcColumn);
dbManager.updateTableSchema(tableInfo.getTableId(), msgSchema.json());
dbManager.updateTableSchema(tableInfo.getTableId(), msgSchema.toJson());
if (JSONObject.parseObject(tableInfo.getProperties()).containsKey(DBConfig.TableInfoProperty.DROPPED_COLUMN)) {
dbManager.removeLogicallyDropColumn(tableInfo.getTableId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
import com.alibaba.fastjson.JSONObject;
import com.dmetasoul.lakesoul.lakesoul.io.NativeIOBase;
import com.dmetasoul.lakesoul.meta.*;
import com.dmetasoul.lakesoul.meta.dao.TableInfoDao;
import com.dmetasoul.lakesoul.meta.entity.TableInfo;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.core.fs.FileSystem;
Expand All @@ -30,6 +33,7 @@
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
import org.apache.flink.types.RowKind;
import org.apache.hadoop.fs.permission.FsAction;
Expand Down Expand Up @@ -72,6 +76,75 @@ public static String getRangeValue(CatalogPartitionSpec cps) {
return "Null";
}

public static org.apache.arrow.vector.types.pojo.Schema toArrowSchema(RowType rowType, Optional<String> cdcColumn) throws CatalogException {
List<Field> fields = new ArrayList<>();
String cdcColName = null;
if (cdcColumn.isPresent()) {
cdcColName = cdcColumn.get();
Field cdcField = ArrowUtils.toArrowField(cdcColName, new VarCharType(false, 16));
fields.add(cdcField);
}

for (RowType.RowField field : rowType.getFields()) {
String name = field.getName();
if (name.equals(SORT_FIELD)) continue;

LogicalType logicalType = field.getType();
Field arrowField = ArrowUtils.toArrowField(name, logicalType);
if (name.equals(cdcColName)) {
if (!arrowField.toString().equals(fields.get(0).toString())) {
throw new CatalogException(CDC_CHANGE_COLUMN +
"=" +
cdcColName +
"has an invalid field of" +
field +
"," +
CDC_CHANGE_COLUMN +
" require field of " +
fields.get(0).toString());
}
} else {
fields.add(arrowField);
}
}
return new org.apache.arrow.vector.types.pojo.Schema(fields);
}

public static org.apache.arrow.vector.types.pojo.Schema toArrowSchema(TableSchema tsc, Optional<String> cdcColumn) throws CatalogException {
List<Field> fields = new ArrayList<>();
String cdcColName = null;
if (cdcColumn.isPresent()) {
cdcColName = cdcColumn.get();
Field cdcField = ArrowUtils.toArrowField(cdcColName, new VarCharType(false, 16));
fields.add(cdcField);
}

for (int i = 0; i < tsc.getFieldCount(); i++) {
String name = tsc.getFieldName(i).get();
DataType dt = tsc.getFieldDataType(i).get();
if (name.equals(SORT_FIELD)) continue;

LogicalType logicalType = dt.getLogicalType();
Field arrowField = ArrowUtils.toArrowField(name, logicalType);
if (name.equals(cdcColName)) {
if (!arrowField.toString().equals(fields.get(0).toString())) {
throw new CatalogException(CDC_CHANGE_COLUMN +
"=" +
cdcColName +
"has an invalid field of" +
arrowField +
"," +
CDC_CHANGE_COLUMN +
" require field of " +
fields.get(0).toString());
}
} else {
fields.add(arrowField);
}
}
return new org.apache.arrow.vector.types.pojo.Schema(fields);
}

public static StructType toSparkSchema(RowType rowType, Optional<String> cdcColumn) throws CatalogException {
StructType stNew = new StructType();

Expand Down Expand Up @@ -220,10 +293,18 @@ public static CatalogBaseTable toFlinkCatalog(TableInfo tableInfo) {
String tableSchema = tableInfo.getTableSchema();
JSONObject properties = JSON.parseObject(tableInfo.getProperties());

StructType struct = (StructType) org.apache.spark.sql.types.DataType.fromJson(tableSchema);
org.apache.arrow.vector.types.pojo.Schema
arrowSchema =
org.apache.spark.sql.arrow.ArrowUtils.toArrowSchema(struct, ZoneId.of("UTC").toString());
org.apache.arrow.vector.types.pojo.Schema arrowSchema = null;
System.out.println(tableSchema);
if (TableInfoDao.isArrowKindSchema(tableSchema)) {
try {
arrowSchema = org.apache.arrow.vector.types.pojo.Schema.fromJSON(tableSchema);
} catch (IOException e) {
throw new CatalogException(e);
}
} else {
StructType struct = (StructType) org.apache.spark.sql.types.DataType.fromJson(tableSchema);
arrowSchema = org.apache.spark.sql.arrow.ArrowUtils.toArrowSchema(struct, ZoneId.of("UTC").toString());
}
RowType rowType = ArrowUtils.fromArrowSchema(arrowSchema);
Builder bd = Schema.newBuilder();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.spark.sql.arrow.ArrowUtils;
import org.apache.spark.sql.types.StructType;
import org.assertj.core.api.Assertions;
import org.junit.Before;
Expand Down Expand Up @@ -82,8 +83,7 @@ public void createTable() {
tEnvs.executeSql("show tables").print();
TableInfo info = DbManage.getTableInfoByNameAndNamespace("user_behaviorgg", "test_lakesoul_meta");
assertEquals(info.getTableSchema(),
new StructType().add("user_id", LongType, false).add("dt", StringType).add("name", StringType)
.json());
ArrowUtils.toArrowSchema(new StructType().add("user_id", LongType, false).add("dt", StringType).add("name", StringType), "UTC").toJson());
tEnvs.executeSql("DROP TABLE user_behaviorgg");
}

Expand All @@ -94,11 +94,11 @@ public void createTableWithLike() {
"'lakesoul_meta_host_port'='9043', 'path'='/tmp/user_behaviorgg', 'use_cdc'='true')");

TableInfo info = DbManage.getTableInfoByNameAndNamespace("user_behaviorgg", "test_lakesoul_meta");
Assertions.assertThat(info.getTableSchema()).isEqualTo(new StructType().add("user_id", LongType, false).add("dt", StringType).add("name", StringType, false).json());
Assertions.assertThat(info.getTableSchema()).isEqualTo(ArrowUtils.toArrowSchema(new StructType().add("name", StringType, false).add("user_id", LongType, false).add("dt", StringType), "UTC").toJson());

tEnvs.executeSql("CREATE TABLE if not exists like_table with ('path'='/tmp/like_table') like user_behaviorgg");
TableInfo info2 = DbManage.getTableInfoByNameAndNamespace("like_table", "test_lakesoul_meta");
Assertions.assertThat(info2.getTableSchema()).isEqualTo(new StructType().add("user_id", LongType, false).add("dt", StringType).add("name", StringType, false).json());
Assertions.assertThat(info2.getTableSchema()).isEqualTo(ArrowUtils.toArrowSchema(new StructType().add("name", StringType, false).add("user_id", LongType, false).add("dt", StringType), "UTC").toJson());
Assertions.assertThat(JSON.parseObject(info.getProperties()).get("lakesoul_cdc_change_column")).isEqualTo(JSON.parseObject(info2.getProperties()).get("lakesoul_cdc_change_column"));
Assertions.assertThat(JSON.parseObject(info.getProperties()).get("path")).isEqualTo("/tmp/user_behaviorgg");
Assertions.assertThat(JSON.parseObject(info2.getProperties()).get("path")).isEqualTo("/tmp/like_table");
Expand Down
Loading
Loading