Skip to content

Commit

Permalink
WIP FoundationDB#2990: make LucenePartitioner thread safe
Browse files Browse the repository at this point in the history
This changes the concurrentUpdate to run with a single partition,
and adds usages of AsyncLock to ensure that updates to the partition
metadata work correctly.
Before calling the issue complete an additional test should be added
that does concurrent inserts, and one for concurrent deletes.
  • Loading branch information
ScottDugas committed Dec 9, 2024
1 parent c694659 commit 0be6fce
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 89 deletions.
2 changes: 1 addition & 1 deletion docs/ReleaseNotes.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ The Apache Commons library has been removed as a dependency. There were a few lo
* **Bug fix** Fix 1 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Bug fix** Fix 2 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Bug fix** Fix 3 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Bug fix** LucenePartitioner is now thread safe [(Issue #2990)](https://github.com/FoundationDB/fdb-record-layer/issues/2990)
* **Bug fix** Fix 4 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Bug fix** Fix 5 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Performance** Improvement 1 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* RecordCoreInternalException.java
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2015-2024 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.apple.foundationdb.record;

import com.apple.foundationdb.annotation.API;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

/**
* Exception thrown when an inconsistency in core record layer behavior is detected.
*/
@API(API.Status.STABLE)
@SuppressWarnings("serial")
public class RecordCoreInternalException extends RecordCoreException {

public RecordCoreInternalException(@Nonnull final String msg, @Nullable final Object... keyValues) {
super(msg, keyValues);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,14 @@ private void insertField(LuceneDocumentFromRecord.DocumentField field, final Doc
}
}

<M extends Message> void writeDocument(final FDBIndexableRecord<M> newRecord, final Map.Entry<Tuple, List<LuceneDocumentFromRecord.DocumentField>> entry, final Integer partitionId) {
try {
writeDocument(entry.getValue(), entry.getKey(), partitionId, newRecord.getPrimaryKey());
} catch (IOException e) {
throw LuceneExceptions.toRecordCoreException("Issue updating new index keys", e, "newRecord", newRecord.getPrimaryKey());
}
}

@SuppressWarnings("PMD.CloseResource")
private void writeDocument(@Nonnull List<LuceneDocumentFromRecord.DocumentField> fields,
Tuple groupingKey,
Expand Down Expand Up @@ -487,14 +495,11 @@ <M extends Message> CompletableFuture<Void> update(@Nullable FDBIndexableRecord<
AsyncUtil.whenAll(newRecordFields.entrySet().stream().map(entry -> {
try {
return tryDeleteInWriteOnlyMode(Objects.requireNonNull(newRecord), entry.getKey()).thenCompose(countDeleted ->
partitioner.addToAndSavePartitionMetadata(newRecord, entry.getKey(), destinationPartitionIdHint).thenApply(partitionId -> {
try {
writeDocument(entry.getValue(), entry.getKey(), partitionId, newRecord.getPrimaryKey());
} catch (IOException e) {
throw LuceneExceptions.toRecordCoreException("Issue updating new index keys", e, "newRecord", newRecord.getPrimaryKey());
}
return null;
}));
partitioner.addToAndSavePartitionMetadata(newRecord, entry.getKey(), destinationPartitionIdHint)
.thenApply(partitionId -> {
writeDocument(newRecord, entry, partitionId);
return null;
}));
} catch (IOException e) {
throw LuceneExceptions.toRecordCoreException("Issue updating", e, "record", Objects.requireNonNull(newRecord).getPrimaryKey());
}
Expand Down Expand Up @@ -541,19 +546,22 @@ private <M extends Message> CompletableFuture<Integer> tryDelete(@Nonnull FDBInd
}

// partitioned
return partitioner.tryGetPartitionInfo(record, groupingKey).thenApply(partitionInfo -> {
return partitioner.tryGetPartitionInfo(record, groupingKey).thenCompose(partitionInfo -> {
// this might be 0 when in writeOnly mode, but otherwise should not happen.
if (partitionInfo != null) {
try {
int countDeleted = deleteDocument(groupingKey, partitionInfo.getId(), record.getPrimaryKey());
if (countDeleted > 0) {
partitioner.decrementCountAndSave(groupingKey, partitionInfo, countDeleted);
return partitioner.decrementCountAndSave(groupingKey, countDeleted, partitionInfo.getId())
.thenApply(vignore -> countDeleted);
} else {
return CompletableFuture.completedFuture(countDeleted);
}
return countDeleted;
} catch (IOException e) {
throw LuceneExceptions.toRecordCoreException("Issue deleting", e, "record", record.getPrimaryKey());
}
}
return 0;
return CompletableFuture.completedFuture(0);
});
}

Expand Down
Loading

0 comments on commit 0be6fce

Please sign in to comment.