From 0be6fceabefed3fd4f83a12bde88538b90c16a1d Mon Sep 17 00:00:00 2001 From: Scott Dugas Date: Mon, 9 Dec 2024 16:42:05 -0500 Subject: [PATCH] WIP #2990: make LucenePartitioner thread safe This changes the concurrentUpdate to run with a single partition, and adds usages of AsyncLock to ensure that updates to the partition metadata work correctly. Before calling the issue complete an additional test should be added that does concurrent inserts, and one for concurrent deletes. --- docs/ReleaseNotes.md | 2 +- .../record/RecordCoreInternalException.java | 38 ++++ .../record/lucene/LuceneIndexMaintainer.java | 32 ++-- .../record/lucene/LucenePartitioner.java | 162 ++++++++++-------- .../lucene/LuceneIndexMaintenanceTest.java | 8 +- .../lucene/LuceneIndexTestValidator.java | 3 +- 6 files changed, 156 insertions(+), 89 deletions(-) create mode 100644 fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/RecordCoreInternalException.java diff --git a/docs/ReleaseNotes.md b/docs/ReleaseNotes.md index d5b7e90ef4..678fcc72a2 100644 --- a/docs/ReleaseNotes.md +++ b/docs/ReleaseNotes.md @@ -17,7 +17,7 @@ The Apache Commons library has been removed as a dependency. There were a few lo * **Bug fix** Fix 1 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN) * **Bug fix** Fix 2 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN) -* **Bug fix** Fix 3 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN) +* **Bug fix** LucenePartitioner is now thread safe [(Issue #2990)](https://github.com/FoundationDB/fdb-record-layer/issues/2990) * **Bug fix** Fix 4 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN) * **Bug fix** Fix 5 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN) * **Performance** Improvement 1 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN) diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/RecordCoreInternalException.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/RecordCoreInternalException.java new file mode 100644 index 0000000000..fa3d572f41 --- /dev/null +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/RecordCoreInternalException.java @@ -0,0 +1,38 @@ +/* + * RecordCoreInternalException.java + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2015-2024 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.apple.foundationdb.record; + +import com.apple.foundationdb.annotation.API; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +/** + * Exception thrown when an inconsistency in core record layer behavior is detected. + */ +@API(API.Status.STABLE) +@SuppressWarnings("serial") +public class RecordCoreInternalException extends RecordCoreException { + + public RecordCoreInternalException(@Nonnull final String msg, @Nullable final Object... keyValues) { + super(msg, keyValues); + } +} diff --git a/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintainer.java b/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintainer.java index 7fb25ac366..26aecfbca8 100644 --- a/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintainer.java +++ b/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintainer.java @@ -245,6 +245,14 @@ private void insertField(LuceneDocumentFromRecord.DocumentField field, final Doc } } + void writeDocument(final FDBIndexableRecord newRecord, final Map.Entry> entry, final Integer partitionId) { + try { + writeDocument(entry.getValue(), entry.getKey(), partitionId, newRecord.getPrimaryKey()); + } catch (IOException e) { + throw LuceneExceptions.toRecordCoreException("Issue updating new index keys", e, "newRecord", newRecord.getPrimaryKey()); + } + } + @SuppressWarnings("PMD.CloseResource") private void writeDocument(@Nonnull List fields, Tuple groupingKey, @@ -487,14 +495,11 @@ CompletableFuture update(@Nullable FDBIndexableRecord< AsyncUtil.whenAll(newRecordFields.entrySet().stream().map(entry -> { try { return tryDeleteInWriteOnlyMode(Objects.requireNonNull(newRecord), entry.getKey()).thenCompose(countDeleted -> - partitioner.addToAndSavePartitionMetadata(newRecord, entry.getKey(), destinationPartitionIdHint).thenApply(partitionId -> { - try { - writeDocument(entry.getValue(), entry.getKey(), partitionId, newRecord.getPrimaryKey()); - } catch (IOException e) { - throw LuceneExceptions.toRecordCoreException("Issue updating new index keys", e, "newRecord", newRecord.getPrimaryKey()); - } - return null; - })); + partitioner.addToAndSavePartitionMetadata(newRecord, entry.getKey(), destinationPartitionIdHint) + .thenApply(partitionId -> { + writeDocument(newRecord, entry, partitionId); + return null; + })); } catch (IOException e) { throw LuceneExceptions.toRecordCoreException("Issue updating", e, "record", Objects.requireNonNull(newRecord).getPrimaryKey()); } @@ -541,19 +546,22 @@ private CompletableFuture tryDelete(@Nonnull FDBInd } // partitioned - return partitioner.tryGetPartitionInfo(record, groupingKey).thenApply(partitionInfo -> { + return partitioner.tryGetPartitionInfo(record, groupingKey).thenCompose(partitionInfo -> { + // this might be 0 when in writeOnly mode, but otherwise should not happen. if (partitionInfo != null) { try { int countDeleted = deleteDocument(groupingKey, partitionInfo.getId(), record.getPrimaryKey()); if (countDeleted > 0) { - partitioner.decrementCountAndSave(groupingKey, partitionInfo, countDeleted); + return partitioner.decrementCountAndSave(groupingKey, countDeleted, partitionInfo.getId()) + .thenApply(vignore -> countDeleted); + } else { + return CompletableFuture.completedFuture(countDeleted); } - return countDeleted; } catch (IOException e) { throw LuceneExceptions.toRecordCoreException("Issue deleting", e, "record", record.getPrimaryKey()); } } - return 0; + return CompletableFuture.completedFuture(0); }); } diff --git a/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LucenePartitioner.java b/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LucenePartitioner.java index 8af4abcc03..1786b9121c 100644 --- a/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LucenePartitioner.java +++ b/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LucenePartitioner.java @@ -33,6 +33,7 @@ import com.apple.foundationdb.record.PipelineOperation; import com.apple.foundationdb.record.RecordCoreArgumentException; import com.apple.foundationdb.record.RecordCoreException; +import com.apple.foundationdb.record.RecordCoreInternalException; import com.apple.foundationdb.record.RecordCursor; import com.apple.foundationdb.record.RecordCursorContinuation; import com.apple.foundationdb.record.RecordCursorEndContinuation; @@ -40,6 +41,7 @@ import com.apple.foundationdb.record.ScanProperties; import com.apple.foundationdb.record.TupleRange; import com.apple.foundationdb.record.cursors.ChainedCursor; +import com.apple.foundationdb.record.locking.LockIdentifier; import com.apple.foundationdb.record.logging.KeyValueLogMessage; import com.apple.foundationdb.record.logging.LogMessageKeys; import com.apple.foundationdb.record.lucene.directory.FDBDirectoryManager; @@ -81,7 +83,6 @@ import java.util.Arrays; import java.util.Collection; import java.util.Comparator; -import java.util.Iterator; import java.util.List; import java.util.Objects; import java.util.concurrent.CompletableFuture; @@ -489,28 +490,30 @@ public CompletableFuture addToAndSavePartitionMetad private CompletableFuture addToAndSavePartitionMetadata(@Nonnull final Tuple groupingKey, @Nonnull final Tuple partitioningKey, @Nullable final Integer assignedPartitionIdOverride) { - - final CompletableFuture assignmentFuture; - if (assignedPartitionIdOverride != null) { - assignmentFuture = getPartitionMetaInfoById(assignedPartitionIdOverride, groupingKey); - } else { - assignmentFuture = getOrCreatePartitionInfo(groupingKey, partitioningKey); - } - return assignmentFuture.thenApply(assignedPartition -> { - // assignedPartition is not null, since a new one is created by the previous call if none exist - LucenePartitionInfoProto.LucenePartitionInfo.Builder builder = Objects.requireNonNull(assignedPartition).toBuilder(); - builder.setCount(assignedPartition.getCount() + 1); - if (isOlderThan(partitioningKey, assignedPartition)) { - // clear the previous key - state.context.ensureActive().clear(partitionMetadataKeyFromPartitioningValue(groupingKey, getPartitionKey(assignedPartition))); - builder.setFrom(ByteString.copyFrom(partitioningKey.pack())); - } - if (isNewerThan(partitioningKey, assignedPartition)) { - builder.setTo(ByteString.copyFrom(partitioningKey.pack())); - } - savePartitionMetadata(groupingKey, builder); - return assignedPartition.getId(); - }); + return state.context.doWithWriteLock(new LockIdentifier(partitionMetadataSubspace(groupingKey)), + () -> { + final CompletableFuture assignmentFuture; + if (assignedPartitionIdOverride != null) { + assignmentFuture = getPartitionMetaInfoById(assignedPartitionIdOverride, groupingKey); + } else { + assignmentFuture = getOrCreatePartitionInfo(groupingKey, partitioningKey); + } + return assignmentFuture.thenApply(assignedPartition -> { + // assignedPartition is not null, since a new one is created by the previous call if none exist + LucenePartitionInfoProto.LucenePartitionInfo.Builder builder = Objects.requireNonNull(assignedPartition).toBuilder(); + builder.setCount(assignedPartition.getCount() + 1); + if (isOlderThan(partitioningKey, assignedPartition)) { + // clear the previous key + state.context.ensureActive().clear(partitionMetadataKeyFromPartitioningValue(groupingKey, getPartitionKey(assignedPartition))); + builder.setFrom(ByteString.copyFrom(partitioningKey.pack())); + } + if (isNewerThan(partitioningKey, assignedPartition)) { + builder.setTo(ByteString.copyFrom(partitioningKey.pack())); + } + savePartitionMetadata(groupingKey, builder); + return assignedPartition.getId(); + }); + }); } /** @@ -525,6 +528,10 @@ byte[] partitionMetadataKeyFromPartitioningValue(@Nonnull Tuple groupKey, @Nonnu return state.indexSubspace.pack(partitionMetadataKeyTuple(groupKey, partitionKey)); } + Subspace partitionMetadataSubspace(@Nonnull Tuple groupKey) { + return state.indexSubspace.subspace(groupKey.add(PARTITION_META_SUBSPACE)); + } + private static Tuple partitionMetadataKeyTuple(final @Nonnull Tuple groupKey, @Nonnull Tuple partitionKey) { return groupKey.add(PARTITION_META_SUBSPACE).addAll(partitionKey); } @@ -600,22 +607,30 @@ CompletableFuture decrementCountAndSave(@Nonnull Tuple groupingKey, + int amount, final int partitionId) { + return state.context.doWithWriteLock(new LockIdentifier(partitionMetadataSubspace(groupingKey)), + () -> getPartitionMetaInfoById(partitionId, groupingKey).thenAccept(serialized -> { + if (serialized == null) { + throw new RecordCoreInternalException("Lucene partition metadata changed during delete") + .addLogInfo(LogMessageKeys.INDEX_NAME, state.index.getName()) + .addLogInfo(LogMessageKeys.INDEX_SUBSPACE, state.indexSubspace); + } + LucenePartitionInfoProto.LucenePartitionInfo.Builder builder = Objects.requireNonNull(serialized).toBuilder(); + // note that the to/from of the partition do not get updated, since that would require us to know + // what the next potential boundary value(s) are. The values, nonetheless, remain valid. + builder.setCount(serialized.getCount() - amount); + + if (builder.getCount() < 0) { + // should never happen + throw new RecordCoreInternalException("Issue updating Lucene partition metadata (resulting count < 0)", + LogMessageKeys.PARTITION_ID, partitionId); + } + savePartitionMetadata(groupingKey, builder); + })); } /** @@ -1024,7 +1039,7 @@ private CompletableFuture moveDocsFromPartition(@Nonnull final LuceneRe timings.initializationNanos = System.nanoTime(); fetchedRecordsFuture = fetchedRecordsFuture.whenComplete((ignored, throwable) -> cursor.close()); - return fetchedRecordsFuture.thenCompose(records -> { + return fetchedRecordsFuture.thenApply(records -> { timings.searchNanos = System.nanoTime(); if (records.size() == 0) { throw new RecordCoreException("Unexpected error: 0 records fetched. repartitionContext {}", repartitioningContext); @@ -1041,7 +1056,7 @@ private CompletableFuture moveDocsFromPartition(@Nonnull final LuceneRe if (LOGGER.isDebugEnabled()) { LOGGER.debug("no records to move, partition {}", partitionInfo); } - return CompletableFuture.completedFuture(0); + return 0; } // reset partition info @@ -1092,40 +1107,43 @@ private CompletableFuture moveDocsFromPartition(@Nonnull final LuceneRe } long updateStart = System.nanoTime(); - Iterator> recordIterator = records.iterator(); final int destinationPartitionId = destinationPartition.getId(); - return AsyncUtil.whileTrue(() -> indexMaintainer.update(null, recordIterator.next(), destinationPartitionId) - .thenApply(ignored -> recordIterator.hasNext()), state.context.getExecutor()) - .thenApply(ignored -> { - if (LOGGER.isDebugEnabled()) { - long updateNanos = System.nanoTime(); - final KeyValueLogMessage logMessage = repartitionLogMessage("Repartitioned records", groupingKey, records.size(), partitionInfo); - logMessage.addKeyAndValue("totalMicros", TimeUnit.NANOSECONDS.toMicros(updateNanos - timings.startNanos)); - logMessage.addKeyAndValue("initializationMicros", TimeUnit.NANOSECONDS.toMicros(timings.initializationNanos - timings.startNanos)); - logMessage.addKeyAndValue("searchMicros", TimeUnit.NANOSECONDS.toMicros(timings.searchNanos - timings.initializationNanos)); - logMessage.addKeyAndValue("clearInfoMicros", TimeUnit.NANOSECONDS.toMicros(timings.clearInfoNanos - timings.searchNanos)); - if (timings.emptyingNanos > 0) { - logMessage.addKeyAndValue("emptyingMicros", TimeUnit.NANOSECONDS.toMicros(timings.emptyingNanos - timings.clearInfoNanos)); - } - if (timings.deleteNanos > 0) { - logMessage.addKeyAndValue("deleteMicros", TimeUnit.NANOSECONDS.toMicros(timings.deleteNanos - timings.clearInfoNanos)); - } - if (timings.metadataUpdateNanos > 0) { - logMessage.addKeyAndValue("metadataUpdateMicros", TimeUnit.NANOSECONDS.toMicros(timings.metadataUpdateNanos - timings.deleteNanos)); - } - if (timings.createPartitionNanos > 0) { - logMessage.addKeyAndValue("createPartitionMicros", TimeUnit.NANOSECONDS.toMicros(timings.createPartitionNanos - endCleanupNanos)); - } - logMessage.addKeyAndValue("updateMicros", TimeUnit.NANOSECONDS.toMicros(updateNanos - updateStart)); - if (timerSnapshot != null && state.context.getTimer() != null) { - logMessage.addKeysAndValues( - StoreTimer.getDifference(state.context.getTimer(), timerSnapshot) - .getKeysAndValues()); - } - LOGGER.debug(logMessage.toString()); - } - return records.size(); - }); + for (FDBIndexableRecord record : records) { + LuceneDocumentFromRecord.getRecordFields(state.index.getRootExpression(), record) + .entrySet().forEach(entry -> { + indexMaintainer.writeDocument(record, entry, destinationPartitionId); + // TODO could update the partition once + addToAndSavePartitionMetadata(record, groupingKey, destinationPartitionId); + }); + } + if (LOGGER.isDebugEnabled()) { + long updateNanos = System.nanoTime(); + final KeyValueLogMessage logMessage = repartitionLogMessage("Repartitioned records", groupingKey, records.size(), partitionInfo); + logMessage.addKeyAndValue("totalMicros", TimeUnit.NANOSECONDS.toMicros(updateNanos - timings.startNanos)); + logMessage.addKeyAndValue("initializationMicros", TimeUnit.NANOSECONDS.toMicros(timings.initializationNanos - timings.startNanos)); + logMessage.addKeyAndValue("searchMicros", TimeUnit.NANOSECONDS.toMicros(timings.searchNanos - timings.initializationNanos)); + logMessage.addKeyAndValue("clearInfoMicros", TimeUnit.NANOSECONDS.toMicros(timings.clearInfoNanos - timings.searchNanos)); + if (timings.emptyingNanos > 0) { + logMessage.addKeyAndValue("emptyingMicros", TimeUnit.NANOSECONDS.toMicros(timings.emptyingNanos - timings.clearInfoNanos)); + } + if (timings.deleteNanos > 0) { + logMessage.addKeyAndValue("deleteMicros", TimeUnit.NANOSECONDS.toMicros(timings.deleteNanos - timings.clearInfoNanos)); + } + if (timings.metadataUpdateNanos > 0) { + logMessage.addKeyAndValue("metadataUpdateMicros", TimeUnit.NANOSECONDS.toMicros(timings.metadataUpdateNanos - timings.deleteNanos)); + } + if (timings.createPartitionNanos > 0) { + logMessage.addKeyAndValue("createPartitionMicros", TimeUnit.NANOSECONDS.toMicros(timings.createPartitionNanos - endCleanupNanos)); + } + logMessage.addKeyAndValue("updateMicros", TimeUnit.NANOSECONDS.toMicros(updateNanos - updateStart)); + if (timerSnapshot != null && state.context.getTimer() != null) { + logMessage.addKeysAndValues( + StoreTimer.getDifference(state.context.getTimer(), timerSnapshot) + .getKeysAndValues()); + } + LOGGER.debug(logMessage.toString()); + } + return records.size(); }); } diff --git a/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintenanceTest.java b/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintenanceTest.java index 9bb8dc845d..8c0ff00fab 100644 --- a/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintenanceTest.java +++ b/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintenanceTest.java @@ -808,7 +808,8 @@ void concurrentUpdate() throws IOException { final boolean primaryKeySegmentIndexEnabled = true; // LucenePartitioner is not thread safe, and the counts get broken // See: https://github.com/FoundationDB/fdb-record-layer/issues/2990 - final int partitionHighWatermark = -1; + // TODO parameterize this, and add a test for concurrent inserts + final int partitionHighWatermark = 100_0000; final LuceneIndexTestDataModel dataModel = new LuceneIndexTestDataModel.Builder(seed, this::getStoreBuilder, pathManager) .setIsGrouped(isGrouped) .setIsSynthetic(isSynthetic) @@ -841,6 +842,9 @@ void concurrentUpdate() throws IOException { } + final LuceneIndexTestValidator luceneIndexTestValidator = new LuceneIndexTestValidator(() -> openContext(contextProps), context -> Objects.requireNonNull(dataModel.schemaSetup.apply(context))); + luceneIndexTestValidator.validate(dataModel.index, dataModel.groupingKeyToPrimaryKeyToPartitionKey, isSynthetic ? "child_str_value:forth" : "text_value:about"); + try (FDBRecordContext context = openContext(contextProps)) { FDBRecordStore recordStore = Objects.requireNonNull(dataModel.schemaSetup.apply(context)); recordStore.getIndexDeferredMaintenanceControl().setAutoMergeDuringCommit(false); @@ -854,8 +858,6 @@ void concurrentUpdate() throws IOException { commit(context); } - - final LuceneIndexTestValidator luceneIndexTestValidator = new LuceneIndexTestValidator(() -> openContext(contextProps), context -> Objects.requireNonNull(dataModel.schemaSetup.apply(context))); luceneIndexTestValidator.validate(dataModel.index, dataModel.groupingKeyToPrimaryKeyToPartitionKey, isSynthetic ? "child_str_value:forth" : "text_value:about"); if (isGrouped) { diff --git a/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexTestValidator.java b/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexTestValidator.java index b6ba3bf270..ede46ffab5 100644 --- a/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexTestValidator.java +++ b/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexTestValidator.java @@ -60,6 +60,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -192,13 +193,13 @@ private void validatePartitionedGroup(final Index index, final String universalS "partitionInfo.count", partitionInfo.getCount())); // if partitionInfo.getCount() is wrong, this can be very confusing, so a different assertion might be // worthwhile + assertThat(records, hasSize(greaterThanOrEqualTo(visitedCount + partitionInfo.getCount()))); final Set expectedPrimaryKeys = Set.copyOf(records.subList(visitedCount, visitedCount + partitionInfo.getCount())); validateDocsInPartition(recordStore, index, partitionInfo.getId(), groupingKey, expectedPrimaryKeys, universalSearch); visitedCount += partitionInfo.getCount(); - assertThat(records.size(), greaterThanOrEqualTo(visitedCount)); validatePrimaryKeySegmentIndex(recordStore, index, groupingKey, partitionInfo.getId(), expectedPrimaryKeys, allowDuplicatePrimaryKeys); expectedPrimaryKeys.forEach(primaryKey -> missingDocuments.get(groupingKey).remove(primaryKey));