Skip to content

Commit

Permalink
add lakesoul catalog
Browse files Browse the repository at this point in the history
Signed-off-by: fphantam <[email protected]>
  • Loading branch information
F-PHantam committed Dec 8, 2023
1 parent b51e9ba commit b4985d5
Show file tree
Hide file tree
Showing 15 changed files with 481 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.catalog.external;

import org.apache.doris.catalog.TableIf;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.InitDatabaseLog;
import org.apache.doris.datasource.lakesoul.LakeSoulExternalCatalog;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;

public class LakeSoulExternalDatabase extends ExternalDatabase<LakeSoulExternalTable> {

private static final Logger LOG = LogManager.getLogger(LakeSoulExternalDatabase.class);

public LakeSoulExternalDatabase(ExternalCatalog extCatalog, long id, String name) {
super(extCatalog, id, name, InitDatabaseLog.Type.LAKESOUL);
}

@Override
protected LakeSoulExternalTable getExternalTable(String tableName, long tblId, ExternalCatalog catalog) {
return new LakeSoulExternalTable(tblId, tableName, name, (LakeSoulExternalCatalog) catalog);
}

@Override
public List<LakeSoulExternalTable> getTablesOnIdOrder() {
return getTables().stream().sorted(Comparator.comparing(TableIf::getName)).collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.catalog.external;

import com.dmetasoul.lakesoul.meta.dao.TableInfoDao;
import com.dmetasoul.lakesoul.meta.entity.TableInfo;
import com.google.common.collect.Lists;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Type;
import org.apache.doris.datasource.lakesoul.LakeSoulExternalCatalog;
import org.apache.doris.thrift.TLakeSoulTable;
import org.apache.doris.thrift.TTableDescriptor;
import org.apache.doris.thrift.TTableType;
import org.apache.spark.sql.execution.arrow.ArrowUtils;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DecimalType;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.apache.spark.sql.types.DataTypes.BinaryType;
import static org.apache.spark.sql.types.DataTypes.BooleanType;
import static org.apache.spark.sql.types.DataTypes.ByteType;
import static org.apache.spark.sql.types.DataTypes.DateType;
import static org.apache.spark.sql.types.DataTypes.DoubleType;
import static org.apache.spark.sql.types.DataTypes.IntegerType;
import static org.apache.spark.sql.types.DataTypes.LongType;
import static org.apache.spark.sql.types.DataTypes.StringType;
import static org.apache.spark.sql.types.DataTypes.TimestampType;


public class LakeSoulExternalTable extends ExternalTable {

public static final int LAKESOUL_TIMESTAMP_SCALE_MS = 6;
public LakeSoulExternalTable(long id, String name, String dbName, LakeSoulExternalCatalog catalog) {
super(id, name, catalog, dbName, TableType.LAKESOUl_EXTERNAL_TABLE);
}

@Override
public List<Column> initSchema() {
String tableSchema = ((LakeSoulExternalCatalog) catalog).getLakeSoulTable(dbName, name).getTableSchema();
StructType schema = null;
System.out.println(tableSchema);
if (TableInfoDao.isArrowKindSchema(tableSchema)) {
try {
schema = ArrowUtils.fromArrowSchema(Schema.fromJSON(tableSchema));
} catch (IOException e) {
throw new RuntimeException(e);
}
} else {
throw new RuntimeException("please upgrade lakesoul-common to latest version");
}
List<Column> tmpSchema = Lists.newArrayListWithCapacity(schema.size());
for (StructField field : schema.fields()) {

tmpSchema.add(new Column(field.name(), lakeSoulTypeToDorisType(field.dataType()), field.nullable()));
}
return tmpSchema;
}

private Type lakeSoulTypeToDorisType(DataType dt) {

if (dt.equals(BooleanType)) {
return Type.BOOLEAN;
} else if (dt.equals(ByteType)) {
return Type.TINYINT;
} else if (dt.equals(Type.SMALLINT)) {
return Type.SMALLINT;
}else if (dt.equals(IntegerType)) {
return Type.INT;
} else if (dt.equals(LongType)) {
return Type.BIGINT;
} else if (dt.equals(DoubleType)) {
return Type.DOUBLE;
} else if (dt.equals(StringType) || dt.equals(BinaryType)) {
return Type.STRING;
} else if (dt instanceof DecimalType) {
DecimalType decimalType = (DecimalType) dt;
return ScalarType.createDecimalV3Type(decimalType.precision(), decimalType.scale());
} else if (dt.equals(DateType)) {
return ScalarType.createDateV2Type();
} else if (dt.equals(TimestampType)) {
return ScalarType.createDatetimeV2Type(LAKESOUL_TIMESTAMP_SCALE_MS);
} else if (dt instanceof StructType) {
ArrayList<org.apache.doris.catalog.StructField> fields = new ArrayList<>();
for (StructField structField : ((StructType) dt).fields()) {
fields.add(new org.apache.doris.catalog.StructField(structField.name(), lakeSoulTypeToDorisType(structField.dataType())));
}
return new org.apache.doris.catalog.StructType(fields);
}
throw new IllegalArgumentException("Cannot transform unknown type: " + dt);
}

@Override
public TTableDescriptor toThrift() {
List<Column> schema = getFullSchema();
TLakeSoulTable tLakeSoulTable = new TLakeSoulTable(dbName, name, new HashMap<>());
TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.HIVE_TABLE, schema.size(), 0,
getName(), dbName);
tTableDescriptor.setLakesoulTable(tLakeSoulTable);
return tTableDescriptor;

}

public TableInfo getLakeSoulTableInfo() {
return ((LakeSoulExternalCatalog) catalog).getLakeSoulTable(dbName, name);
}

public String tablePath() {
return ((LakeSoulExternalCatalog) catalog).getLakeSoulTable(dbName, name).getTablePath();
}

public Map<String, String> getHadoopProperties() {
return catalog.getCatalogProperty().getHadoopProperties();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.doris.common.FeConstants;
import org.apache.doris.datasource.iceberg.IcebergExternalCatalogFactory;
import org.apache.doris.datasource.jdbc.JdbcExternalCatalog;
import org.apache.doris.datasource.lakesoul.LakeSoulExternalCatalog;
import org.apache.doris.datasource.paimon.PaimonExternalCatalogFactory;
import org.apache.doris.datasource.test.TestExternalCatalog;

Expand Down Expand Up @@ -130,6 +131,8 @@ private static CatalogIf createCatalog(long catalogId, String name, String resou
case "max_compute":
catalog = new MaxComputeExternalCatalog(catalogId, name, resource, props, comment);
break;
case "lakesoul":
catalog = new LakeSoulExternalCatalog(catalogId, name, resource, props, comment);
case "test":
if (!FeConstants.runningUnitTest) {
throw new DdlException("test catalog is only for FE unit test");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.doris.catalog.external.HMSExternalDatabase;
import org.apache.doris.catalog.external.IcebergExternalDatabase;
import org.apache.doris.catalog.external.JdbcExternalDatabase;
import org.apache.doris.catalog.external.LakeSoulExternalDatabase;
import org.apache.doris.catalog.external.MaxComputeExternalDatabase;
import org.apache.doris.catalog.external.PaimonExternalDatabase;
import org.apache.doris.catalog.external.TestExternalDatabase;
Expand Down Expand Up @@ -492,6 +493,8 @@ protected ExternalDatabase<? extends ExternalTable> getDbForInit(String dbName,
return new MaxComputeExternalDatabase(this, dbId, dbName);
//case HUDI:
//return new HudiExternalDatabase(this, dbId, dbName);
case LAKESOUL:
return new LakeSoulExternalDatabase(this, dbId, dbName);
case TEST:
return new TestExternalDatabase(this, dbId, dbName);
case PAIMON:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public enum Type {
MAX_COMPUTE,
HUDI,
PAIMON,
LAKESOUL,
TEST,
UNKNOWN;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.lakesoul;

import com.dmetasoul.lakesoul.meta.DBManager;
import com.dmetasoul.lakesoul.meta.entity.TableInfo;
import org.apache.doris.datasource.CatalogProperty;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.InitCatalogLog;
import org.apache.doris.datasource.SessionContext;
import org.apache.doris.datasource.property.PropertyConverter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class LakeSoulExternalCatalog extends ExternalCatalog {

private static final Logger LOG = LogManager.getLogger(LakeSoulExternalCatalog.class);

private final DBManager dbManager;

public LakeSoulExternalCatalog(long catalogId, String name, String resource, Map<String, String> props,
String comment) {
super(catalogId, name, InitCatalogLog.Type.LAKESOUL, comment);
props = PropertyConverter.convertToMetaProperties(props);
dbManager = new DBManager();
catalogProperty = new CatalogProperty(resource, props);
}
@Override
protected List<String> listDatabaseNames() {
return dbManager.listNamespaces();
}

@Override
public List<String> listTableNames(SessionContext ctx, String dbName) {
List<TableInfo> tifs = dbManager.getTableInfosByNamespace(getRealTableName(dbName));
List<String> tableNames = new ArrayList<>(100);
for (TableInfo item : tifs) {
// if (FlinkUtil.isTable(item)) {
tableNames.add(item.getTableName());
// }
}
return tableNames;
}

@Override
public boolean tableExist(SessionContext ctx, String dbName, String tblName) {
TableInfo tableInfo =
dbManager.getTableInfoByNameAndNamespace(getRealTableName(dbName), tblName);

return null != tableInfo;
}

@Override
protected void initLocalObjectsImpl() {

}

public TableInfo getLakeSoulTable(String dbName, String tblName) {
makeSureInitialized();
return dbManager.getTableInfoByNameAndNamespace(dbName, tblName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.catalog.external.IcebergExternalTable;
import org.apache.doris.catalog.external.JdbcExternalTable;
import org.apache.doris.catalog.external.LakeSoulExternalTable;
import org.apache.doris.catalog.external.MaxComputeExternalTable;
import org.apache.doris.catalog.external.PaimonExternalTable;
import org.apache.doris.common.UserException;
Expand Down Expand Up @@ -175,6 +176,7 @@
import org.apache.doris.planner.external.hudi.HudiScanNode;
import org.apache.doris.planner.external.iceberg.IcebergScanNode;
import org.apache.doris.planner.external.jdbc.JdbcScanNode;
import org.apache.doris.planner.external.lakesoul.LakeSoulScanNode;
import org.apache.doris.planner.external.paimon.PaimonScanNode;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.statistics.StatisticConstants;
Expand Down Expand Up @@ -479,6 +481,9 @@ public PlanFragment visitPhysicalFileScan(PhysicalFileScan fileScan, PlanTransla
scanNode = new PaimonScanNode(fileScan.translatePlanNodeId(), tupleDescriptor, false);
} else if (table instanceof MaxComputeExternalTable) {
scanNode = new MaxComputeScanNode(fileScan.translatePlanNodeId(), tupleDescriptor, false);
} else if (table instanceof LakeSoulExternalTable) {
scanNode = new LakeSoulScanNode(fileScan.translatePlanNodeId(), tupleDescriptor, false);
((LakeSoulScanNode) scanNode).setSelectedPartitions(fileScan.getSelectedPartitions());
} else {
throw new RuntimeException("do not support table type " + table.getType());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.apache.doris.planner.external.hudi.HudiScanNode;
import org.apache.doris.planner.external.iceberg.IcebergScanNode;
import org.apache.doris.planner.external.jdbc.JdbcScanNode;
import org.apache.doris.planner.external.lakesoul.LakeSoulScanNode;
import org.apache.doris.planner.external.odbc.OdbcScanNode;
import org.apache.doris.planner.external.paimon.PaimonScanNode;
import org.apache.doris.qe.ConnectContext;
Expand Down Expand Up @@ -1946,6 +1947,8 @@ private PlanNode createScanNode(Analyzer analyzer, TableRef tblRef, SelectStmt s
case JDBC_EXTERNAL_TABLE:
scanNode = new JdbcScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true);
break;
case LAKESOUl_EXTERNAL_TABLE:
scanNode = new LakeSoulScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true);
case TEST_EXTERNAL_TABLE:
scanNode = new TestExternalTableScanNode(ctx.getNextNodeId(), tblRef.getDesc());
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import org.apache.doris.planner.external.hudi.HudiSplit;
import org.apache.doris.planner.external.iceberg.IcebergScanNode;
import org.apache.doris.planner.external.iceberg.IcebergSplit;
import org.apache.doris.planner.external.lakesoul.LakeSoulScanNode;
import org.apache.doris.planner.external.lakesoul.LakeSoulSplit;
import org.apache.doris.planner.external.paimon.PaimonScanNode;
import org.apache.doris.planner.external.paimon.PaimonSplit;
import org.apache.doris.qe.ConnectContext;
Expand Down Expand Up @@ -367,6 +369,8 @@ public void createScanRangeLocations() throws UserException {
HudiScanNode.setHudiParams(rangeDesc, (HudiSplit) fileSplit);
} else if (fileSplit instanceof MaxComputeSplit) {
MaxComputeScanNode.setScanParams(rangeDesc, (MaxComputeSplit) fileSplit);
} else if (fileSplit instanceof LakeSoulSplit) {
LakeSoulScanNode.setLakeSoulParams(rangeDesc, (LakeSoulSplit) fileSplit);
}

curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ public enum TableFormatType {
HUDI("hudi"),
PAIMON("paimon"),
MAX_COMPUTE("max_compute"),
TRANSACTIONAL_HIVE("transactional_hive");
TRANSACTIONAL_HIVE("transactional_hive"),
LAKESOUL("lakesoul");

private final String tableFormatType;

Expand Down
Loading

0 comments on commit b4985d5

Please sign in to comment.