Skip to content

Commit

Permalink
add more synchronized to source
Browse files Browse the repository at this point in the history
Signed-off-by: chenxu <[email protected]>
  • Loading branch information
dmetasoul01 committed Nov 21, 2024
1 parent 0cc2a2c commit 430e0a3
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import com.dmetasoul.lakesoul.lakesoul.io.substrait.SubstraitUtil;
import com.dmetasoul.lakesoul.meta.DataFileInfo;
import com.dmetasoul.lakesoul.meta.DataOperation;
import com.dmetasoul.lakesoul.meta.LakeSoulOptions;
import com.dmetasoul.lakesoul.meta.MetaVersion;
import com.dmetasoul.lakesoul.meta.entity.PartitionInfo;
import com.dmetasoul.lakesoul.meta.entity.TableInfo;
Expand All @@ -16,7 +15,6 @@
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.Path;
import org.apache.flink.lakesoul.tool.FlinkUtil;
import org.apache.flink.shaded.guava30.com.google.common.collect.Maps;
Expand All @@ -43,22 +41,13 @@ public class LakeSoulAllPartitionDynamicSplitEnumerator implements SplitEnumerat
private final Plan partitionFilters;
private final List<String> partitionColumns;
private final TableInfo tableInfo;
protected Schema partitionArrowSchema;
String tableId;
private long startTime;
private long nextStartTime;
private int hashBucketNum = -1;

protected Schema partitionArrowSchema;

public LakeSoulAllPartitionDynamicSplitEnumerator(SplitEnumeratorContext<LakeSoulPartitionSplit> context,
LakeSoulDynSplitAssigner splitAssigner,
RowType rowType,
long discoveryInterval,
long startTime,
String tableId,
String hashBucketNum,
List<String> partitionColumns,
Plan partitionFilters) {
public LakeSoulAllPartitionDynamicSplitEnumerator(SplitEnumeratorContext<LakeSoulPartitionSplit> context, LakeSoulDynSplitAssigner splitAssigner, RowType rowType, long discoveryInterval, long startTime, String tableId, String hashBucketNum, List<String> partitionColumns, Plan partitionFilters) {
this.context = context;
this.splitAssigner = splitAssigner;
this.discoveryInterval = discoveryInterval;
Expand All @@ -79,81 +68,91 @@ public LakeSoulAllPartitionDynamicSplitEnumerator(SplitEnumeratorContext<LakeSou

@Override
public void start() {
context.callAsync(this::enumerateSplits, this::processDiscoveredSplits, discoveryInterval,
discoveryInterval);
context.callAsync(this::enumerateSplits, this::processDiscoveredSplits, discoveryInterval, discoveryInterval);
}

@Override
public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
public synchronized void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
LOG.info("handleSplitRequest subTaskId {}, oid {}, tid {}",
System.identityHashCode(this),
subtaskId, Thread.currentThread().getId());
if (!context.registeredReaders().containsKey(subtaskId)) {
// reader failed between sending the request and now. skip this request.
return;
}
int tasksSize = context.registeredReaders().size();
synchronized (this) {
Optional<LakeSoulPartitionSplit> nextSplit = this.splitAssigner.getNext(subtaskId, tasksSize);
if (nextSplit.isPresent()) {
context.assignSplit(nextSplit.get(), subtaskId);
taskIdsAwaitingSplit.remove(subtaskId);
} else {
taskIdsAwaitingSplit.add(subtaskId);
}
Optional<LakeSoulPartitionSplit> nextSplit = this.splitAssigner.getNext(subtaskId, tasksSize);
if (nextSplit.isPresent()) {
context.assignSplit(nextSplit.get(), subtaskId);
taskIdsAwaitingSplit.remove(subtaskId);
} else {
taskIdsAwaitingSplit.add(subtaskId);
}
}

@Override
public void addSplitsBack(List<LakeSoulPartitionSplit> splits, int subtaskId) {
LOG.info("Add split back: {}", splits);
synchronized (this) {
splitAssigner.addSplits(splits);
}
public synchronized void addSplitsBack(List<LakeSoulPartitionSplit> splits, int subtaskId) {
LOG.info("Add split back: {} for subTaskId {}, oid {}, tid {}",
splits, subtaskId,
System.identityHashCode(this),
Thread.currentThread().getId());
splitAssigner.addSplits(splits);
}

@Override
public void addReader(int subtaskId) {
}

@Override
public LakeSoulPendingSplits snapshotState(long checkpointId) throws Exception {
synchronized (this) {
LakeSoulPendingSplits pendingSplits =
new LakeSoulPendingSplits(splitAssigner.remainingSplits(), this.nextStartTime, this.tableId, "",
this.discoveryInterval, this.hashBucketNum);
LOG.info("LakeSoulAllPartitionDynamicSplitEnumerator snapshotState {}", pendingSplits);
return pendingSplits;
}
public synchronized LakeSoulPendingSplits snapshotState(long checkpointId) throws Exception {
LakeSoulPendingSplits pendingSplits = new LakeSoulPendingSplits(
splitAssigner.remainingSplits(), this.nextStartTime, this.tableId,
"", this.discoveryInterval, this.hashBucketNum);
LOG.info("LakeSoulAllPartitionDynamicSplitEnumerator snapshotState chkId {}, splits {}, oid {}, tid {}",
checkpointId, pendingSplits,
System.identityHashCode(this),
Thread.currentThread().getId());
return pendingSplits;
}

@Override
public void close() throws IOException {

}

private void processDiscoveredSplits(Collection<LakeSoulPartitionSplit> splits, Throwable error) {
private synchronized void processDiscoveredSplits(
Collection<LakeSoulPartitionSplit> splits, Throwable error) {
if (error != null) {
LOG.error("Failed to enumerate files", error);
return;
}
LOG.info("Process discovered splits {}", splits);
LOG.info("Process discovered splits {}, oid {}, tid {}", splits,
System.identityHashCode(this),
Thread.currentThread().getId());
int tasksSize = context.registeredReaders().size();
synchronized (this) {
this.splitAssigner.addSplits(splits);
Iterator<Integer> iter = taskIdsAwaitingSplit.iterator();
while (iter.hasNext()) {
int taskId = iter.next();
Optional<LakeSoulPartitionSplit> al = this.splitAssigner.getNext(taskId, tasksSize);
if (al.isPresent()) {
context.assignSplit(al.get(), taskId);
iter.remove();
}
this.splitAssigner.addSplits(splits);
Iterator<Integer> iter = taskIdsAwaitingSplit.iterator();
while (iter.hasNext()) {
int taskId = iter.next();
Optional<LakeSoulPartitionSplit> al = this.splitAssigner.getNext(taskId, tasksSize);
if (al.isPresent()) {
context.assignSplit(al.get(), taskId);
iter.remove();
}
}
LOG.info("Process discovered splits done {}, oid {}, tid {}", splits,
System.identityHashCode(this),
Thread.currentThread().getId());
}

public Collection<LakeSoulPartitionSplit> enumerateSplits() {
public synchronized Collection<LakeSoulPartitionSplit> enumerateSplits() {
LOG.info("enumerateSplits begin, oid {}, tid {}",
System.identityHashCode(this),
Thread.currentThread().getId());
List<PartitionInfo> allPartitionInfo = MetaVersion.getAllPartitionInfo(tableId);
LOG.info("allPartitionInfo={}", allPartitionInfo);
List<PartitionInfo> filteredPartition = SubstraitUtil.applyPartitionFilters(allPartitionInfo, partitionArrowSchema, partitionFilters);
List<PartitionInfo> filteredPartition = SubstraitUtil.applyPartitionFilters(
allPartitionInfo, partitionArrowSchema, partitionFilters);
LOG.info("filteredPartition={}, filter={}", filteredPartition, partitionFilters);

ArrayList<LakeSoulPartitionSplit> splits = new ArrayList<>(16);
Expand All @@ -166,24 +165,28 @@ public Collection<LakeSoulPartitionSplit> enumerateSplits() {
if (partitionLatestTimestamp.containsKey(partitionDesc)) {
Long lastTimestamp = partitionLatestTimestamp.get(partitionDesc);
LOG.info("getIncrementalPartitionDataInfo, startTime={}, endTime={}", lastTimestamp, latestTimestamp);
dataFileInfos =
DataOperation.getIncrementalPartitionDataInfo(tableId, partitionDesc, lastTimestamp, latestTimestamp, "incremental");
dataFileInfos = DataOperation.getIncrementalPartitionDataInfo(
tableId, partitionDesc, lastTimestamp, latestTimestamp, "incremental");
} else {
dataFileInfos =
DataOperation.getIncrementalPartitionDataInfo(tableId, partitionDesc, startTime, latestTimestamp, "incremental");
dataFileInfos = DataOperation.getIncrementalPartitionDataInfo(
tableId, partitionDesc, startTime, latestTimestamp, "incremental");
}
if (dataFileInfos.length > 0) {
Map<String, Map<Integer, List<Path>>> splitByRangeAndHashPartition =
FlinkUtil.splitDataInfosToRangeAndHashPartition(tableInfo, dataFileInfos);
for (Map.Entry<String, Map<Integer, List<Path>>> entry : splitByRangeAndHashPartition.entrySet()) {
for (Map.Entry<Integer, List<Path>> split : entry.getValue().entrySet()) {
splits.add(new LakeSoulPartitionSplit(String.valueOf(split.hashCode()), split.getValue(), 0, split.getKey(), partitionDesc));
splits.add(new LakeSoulPartitionSplit(String.valueOf(split.hashCode()), split.getValue(),
0, split.getKey(), partitionDesc));
}
}
}
partitionLatestTimestamp.put(partitionDesc, latestTimestamp);
}
LOG.info("partitionLatestTimestamp={}", partitionLatestTimestamp);
LOG.info("dynamic enumerate done, partitionLatestTimestamp={}, oid {}, tid {}",
partitionLatestTimestamp,
System.identityHashCode(this),
Thread.currentThread().getId());

return splits;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,17 @@ public LakeSoulSplitReader(Configuration conf,
}

@Override
public RecordsWithSplitIds<RowData> fetch() throws IOException {
public synchronized RecordsWithSplitIds<RowData> fetch() throws IOException {
try {
close();
LakeSoulPartitionSplit split = splits.poll();
LOG.info("Fetched split {}, oid {}, tid {}",
split,
System.identityHashCode(this),
Thread.currentThread().getId());
lastSplitReader =
new LakeSoulOneSplitRecordsReader(this.conf,
Objects.requireNonNull(splits.poll()),
Objects.requireNonNull(split),
this.tableRowType,
this.projectedRowType,
this.projectedRowTypeWithPk,
Expand All @@ -88,15 +93,17 @@ public RecordsWithSplitIds<RowData> fetch() throws IOException {
}

@Override
public void handleSplitsChanges(SplitsChange<LakeSoulPartitionSplit> splitChange) {
public synchronized void handleSplitsChanges(SplitsChange<LakeSoulPartitionSplit> splitChange) {
if (!(splitChange instanceof SplitsAddition)) {
throw new UnsupportedOperationException(
String.format("The SplitChange type of %s is not supported.",
splitChange.getClass()));
}

LOG.info("Handling split change {}",
splitChange);
LOG.info("Handling split change {}, oid {}, tid {}",
splitChange,
System.identityHashCode(this),
Thread.currentThread().getId());
splits.addAll(splitChange.splits());
}

Expand All @@ -105,7 +112,7 @@ public void wakeUp() {
}

@Override
public void close() throws Exception {
public synchronized void close() throws Exception {
if (lastSplitReader != null) {
lastSplitReader.close();
lastSplitReader = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public void start() {
}

@Override
public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
public synchronized void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
if (!context.registeredReaders().containsKey(subtaskId)) {
// reader failed between sending the request and now. skip this request.
return;
Expand All @@ -51,7 +51,7 @@ public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname
}

@Override
public void addSplitsBack(List<LakeSoulPartitionSplit> splits, int subtaskId) {
public synchronized void addSplitsBack(List<LakeSoulPartitionSplit> splits, int subtaskId) {
LOG.info("Add split back: {}", splits);
splitAssigner.addSplits(splits);
}
Expand All @@ -61,7 +61,7 @@ public void addReader(int subtaskId) {
}

@Override
public LakeSoulPendingSplits snapshotState(long checkpointId) throws Exception {
public synchronized LakeSoulPendingSplits snapshotState(long checkpointId) throws Exception {
LOG.info("LakeSoulStaticSplitEnumerator snapshotState");
return null;
}
Expand Down

0 comments on commit 430e0a3

Please sign in to comment.