Skip to content

Commit

Permalink
cleanup redundant DataOperation (#346)
Browse files Browse the repository at this point in the history
Signed-off-by: zenghua <[email protected]>
Co-authored-by: zenghua <[email protected]>
  • Loading branch information
Ceng23333 and zenghua authored Oct 10, 2023
1 parent c3d9afb commit 3fa3a9a
Show file tree
Hide file tree
Showing 38 changed files with 174 additions and 869 deletions.
34 changes: 34 additions & 0 deletions lakesoul-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,28 @@ SPDX-License-Identifier: Apache-2.0
</execution>
</executions>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>4.6.3</version>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

Expand Down Expand Up @@ -212,6 +234,18 @@ SPDX-License-Identifier: Apache-2.0
<classifier>native</classifier>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>32.0.0-jre</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-api</artifactId>
<version>3.3.2</version>
<scope>${local.scope}</scope>
</dependency>

</dependencies>

<profiles>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
package com.dmetasoul.lakesoul.meta

import com.dmetasoul.lakesoul.meta.entity.DataCommitInfo
import com.facebook.presto.lakesoul.pojo.Path
import com.google.common.collect.Lists
import org.apache.hadoop.fs.Path

import java.util.{Objects, UUID}
import com.google.common.collect.Lists
import scala.collection.JavaConverters.{asJavaIterableConverter, asScalaBufferConverter}
import scala.collection.mutable.ArrayBuffer
import scala.collection.{JavaConverters, mutable}
Expand Down Expand Up @@ -42,10 +42,15 @@ case class DataFileInfo(range_partitions: String, path: String, file_op: String,
override def hashCode(): Int = {
Objects.hash(range_partitions, path, file_op)
}

lazy val range_version: String = range_partitions + "-" + file_exist_cols

//trans to files which need to delete
def expire(deleteTime: Long): DataFileInfo = this.copy(modification_time = deleteTime)
}

case class PartitionInfo(table_id: String, range_value: String, version: Int = -1,
read_files: Array[UUID] = Array.empty[UUID], expression: String = "", commit_op: String = "") {
case class PartitionInfoScala(table_id: String, range_value: String, version: Int = -1,
read_files: Array[UUID] = Array.empty[UUID], expression: String = "", commit_op: String = "") {
override def toString: String = {
s"partition info: {\ntable_name: $table_id,\nrange_value: $range_value}"
}
Expand All @@ -59,7 +64,7 @@ object DataOperation {
getTableDataInfo(MetaVersion.getAllPartitionInfo(tableId))
}

def getTableDataInfo(partition_info_arr: Array[PartitionInfo]): Array[DataFileInfo] = {
def getTableDataInfo(partition_info_arr: Array[PartitionInfoScala]): Array[DataFileInfo] = {

val file_info_buf = new ArrayBuffer[DataFileInfo]()

Expand All @@ -73,7 +78,7 @@ object DataOperation {

def getTableDataInfo(tableId: String, partitions: List[String]): Array[DataFileInfo] = {
val Pars = MetaVersion.getAllPartitionInfo(tableId)
val partitionInfos = new ArrayBuffer[PartitionInfo]()
val partitionInfos = new ArrayBuffer[PartitionInfoScala]()
for (partition_info <- Pars) {
var contained = true;
for (item <- partitions) {
Expand Down Expand Up @@ -120,14 +125,14 @@ object DataOperation {
}

//get fies info in this partition that match the current read version
private def getSinglePartitionDataInfo(partition_info: PartitionInfo): ArrayBuffer[DataFileInfo] = {
def getSinglePartitionDataInfo(partition_info: PartitionInfoScala): ArrayBuffer[DataFileInfo] = {
val file_arr_buf = new ArrayBuffer[DataFileInfo]()

val metaPartitionInfo = entity.PartitionInfo.newBuilder
metaPartitionInfo.setTableId(partition_info.table_id)
metaPartitionInfo.setPartitionDesc(partition_info.range_value)
metaPartitionInfo.addAllSnapshot(JavaConverters.bufferAsJavaList(partition_info.read_files.map(DBUtil.toProtoUuid).toBuffer))
val dataCommitInfoList = dbManager.getTableSinglePartitionDataInfo(metaPartitionInfo.build).asScala.toArray
val metaPartitionInfoScala = entity.PartitionInfo.newBuilder
metaPartitionInfoScala.setTableId(partition_info.table_id)
metaPartitionInfoScala.setPartitionDesc(partition_info.range_value)
metaPartitionInfoScala.addAllSnapshot(JavaConverters.bufferAsJavaList(partition_info.read_files.map(DBUtil.toProtoUuid).toBuffer))
val dataCommitInfoList = dbManager.getTableSinglePartitionDataInfo(metaPartitionInfoScala.build).asScala.toArray
for (metaDataCommitInfo <- dataCommitInfoList) {
val fileOps = metaDataCommitInfo.getFileOpsList.asScala.toArray
for (file <- fileOps) {
Expand All @@ -151,8 +156,8 @@ object DataOperation {
getSinglePartitionDataInfo(table_id, partition_desc, startTimestamp, endTime, readType).toArray
}

private def getSinglePartitionDataInfo(table_id: String, partition_desc: String, startTimestamp: Long,
endTimestamp: Long, readType: String): ArrayBuffer[DataFileInfo] = {
def getSinglePartitionDataInfo(table_id: String, partition_desc: String, startTimestamp: Long,
endTimestamp: Long, readType: String): ArrayBuffer[DataFileInfo] = {
if (readType.equals(LakeSoulOptions.ReadType.INCREMENTAL_READ) || readType
.equals(LakeSoulOptions.ReadType.SNAPSHOT_READ)) {
if (null == partition_desc || "".equals(partition_desc)) {
Expand All @@ -175,9 +180,9 @@ object DataOperation {
}
}

private def getSinglePartitionIncrementalDataInfos(table_id: String, partition_desc: String,
startVersionTimestamp: Long,
endVersionTimestamp: Long): ArrayBuffer[DataFileInfo] = {
def getSinglePartitionIncrementalDataInfos(table_id: String, partition_desc: String,
startVersionTimestamp: Long,
endVersionTimestamp: Long): ArrayBuffer[DataFileInfo] = {
val preVersionUUIDs = new mutable.LinkedHashSet[UUID]()
val compactionUUIDs = new mutable.LinkedHashSet[UUID]()
val incrementalAllUUIDs = new mutable.LinkedHashSet[UUID]()
Expand Down Expand Up @@ -221,4 +226,13 @@ object DataOperation {
fillFiles(file_arr_buf, dataCommitInfoList)
}
}


def dropDataInfoData(table_id: String, range: String, commit_id: UUID): Unit = {
MetaVersion.dbManager.deleteDataCommitInfo(table_id, range, commit_id)
}

def dropDataInfoData(table_id: String): Unit = {
MetaVersion.dbManager.deleteDataCommitInfo(table_id)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,9 @@ object MetaVersion {
// )
// }

def getSinglePartitionInfo(table_id: String, range_value: String, range_id: String): PartitionInfo = {
def getSinglePartitionInfo(table_id: String, range_value: String, range_id: String): PartitionInfoScala = {
val info = dbManager.getSinglePartitionInfo(table_id, range_value)
PartitionInfo(
PartitionInfoScala(
table_id = info.getTableId,
range_value = range_value,
version = info.getVersion,
Expand All @@ -132,10 +132,10 @@ object MetaVersion {
)
}

def getSinglePartitionInfoForVersion(table_id: String, range_value: String, version: Int): Array[PartitionInfo] = {
val partitionVersionBuffer = new ArrayBuffer[PartitionInfo]()
def getSinglePartitionInfoForVersion(table_id: String, range_value: String, version: Int): Array[PartitionInfoScala] = {
val partitionVersionBuffer = new ArrayBuffer[PartitionInfoScala]()
val info = dbManager.getSinglePartitionInfo(table_id, range_value, version)
partitionVersionBuffer += PartitionInfo(
partitionVersionBuffer += PartitionInfoScala(
table_id = info.getTableId,
range_value = range_value,
version = info.getVersion,
Expand All @@ -147,12 +147,12 @@ object MetaVersion {

}

def getOnePartitionVersions(table_id: String, range_value: String): Array[PartitionInfo] = {
val partitionVersionBuffer = new ArrayBuffer[PartitionInfo]()
def getOnePartitionVersions(table_id: String, range_value: String): Array[PartitionInfoScala] = {
val partitionVersionBuffer = new ArrayBuffer[PartitionInfoScala]()
val res_itr = dbManager.getOnePartitionVersions(table_id, range_value).iterator()
while (res_itr.hasNext) {
val res = res_itr.next()
partitionVersionBuffer += PartitionInfo(
partitionVersionBuffer += PartitionInfoScala(
table_id = res.getTableId,
range_value = res.getPartitionDesc,
version = res.getVersion,
Expand Down Expand Up @@ -184,12 +184,12 @@ object MetaVersion {
(false, "")
}

def getAllPartitionInfo(table_id: String): Array[PartitionInfo] = {
val partitionVersionBuffer = new ArrayBuffer[PartitionInfo]()
def getAllPartitionInfo(table_id: String): Array[PartitionInfoScala] = {
val partitionVersionBuffer = new ArrayBuffer[PartitionInfoScala]()
val res_itr = dbManager.getAllPartitionInfo(table_id).iterator()
while (res_itr.hasNext) {
val res = res_itr.next()
partitionVersionBuffer += PartitionInfo(
partitionVersionBuffer += PartitionInfoScala(
table_id = res.getTableId,
range_value = res.getPartitionDesc,
version = res.getVersion,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,11 +405,11 @@ public static DataFileInfo[] getTargetDataFileInfo(TableInfo tif, List<Map<Strin
List<String> partitionDescs = remainingPartitions.stream()
.map(DBUtil::formatPartitionDesc)
.collect(Collectors.toList());
List<PartitionInfo> partitionInfos = new ArrayList<>();
List<PartitionInfoScala> partitionInfos = new ArrayList<>();
for (String partitionDesc : partitionDescs) {
partitionInfos.add(MetaVersion.getSinglePartitionInfo(tif.getTableId(), partitionDesc, ""));
}
PartitionInfo[] ptinfos = partitionInfos.toArray(new PartitionInfo[0]);
PartitionInfoScala[] ptinfos = partitionInfos.toArray(new PartitionInfoScala[0]);
return DataOperation.getTableDataInfo(ptinfos);
}
}
Expand All @@ -433,8 +433,8 @@ public static Map<String, Map<Integer, List<Path>>> splitDataInfosToRangeAndHash
}

public static DataFileInfo[] getSinglePartitionDataFileInfo(TableInfo tif, String partitionDesc) {
PartitionInfo partitionInfo = MetaVersion.getSinglePartitionInfo(tif.getTableId(), partitionDesc, "");
return DataOperation.getTableDataInfo(new PartitionInfo[]{partitionInfo});
PartitionInfoScala partitionInfo = MetaVersion.getSinglePartitionInfo(tif.getTableId(), partitionDesc, "");
return DataOperation.getTableDataInfo(new PartitionInfoScala[]{partitionInfo});
}

public static int[] getFieldPositions(String[] fields, List<String> allFields) {
Expand Down
Loading

0 comments on commit 3fa3a9a

Please sign in to comment.