diff --git a/docs/ReleaseNotes.md b/docs/ReleaseNotes.md index d5b7e90ef4..678fcc72a2 100644 --- a/docs/ReleaseNotes.md +++ b/docs/ReleaseNotes.md @@ -17,7 +17,7 @@ The Apache Commons library has been removed as a dependency. There were a few lo * **Bug fix** Fix 1 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN) * **Bug fix** Fix 2 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN) -* **Bug fix** Fix 3 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN) +* **Bug fix** LucenePartitioner is now thread safe [(Issue #2990)](https://github.com/FoundationDB/fdb-record-layer/issues/2990) * **Bug fix** Fix 4 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN) * **Bug fix** Fix 5 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN) * **Performance** Improvement 1 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN) diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/RecordCoreInternalException.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/RecordCoreInternalException.java new file mode 100644 index 0000000000..fa3d572f41 --- /dev/null +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/RecordCoreInternalException.java @@ -0,0 +1,38 @@ +/* + * RecordCoreInternalException.java + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2015-2024 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.apple.foundationdb.record; + +import com.apple.foundationdb.annotation.API; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +/** + * Exception thrown when an inconsistency in core record layer behavior is detected. + */ +@API(API.Status.STABLE) +@SuppressWarnings("serial") +public class RecordCoreInternalException extends RecordCoreException { + + public RecordCoreInternalException(@Nonnull final String msg, @Nullable final Object... keyValues) { + super(msg, keyValues); + } +} diff --git a/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintainer.java b/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintainer.java index 7fb25ac366..26aecfbca8 100644 --- a/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintainer.java +++ b/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintainer.java @@ -245,6 +245,14 @@ private void insertField(LuceneDocumentFromRecord.DocumentField field, final Doc } } + void writeDocument(final FDBIndexableRecord newRecord, final Map.Entry> entry, final Integer partitionId) { + try { + writeDocument(entry.getValue(), entry.getKey(), partitionId, newRecord.getPrimaryKey()); + } catch (IOException e) { + throw LuceneExceptions.toRecordCoreException("Issue updating new index keys", e, "newRecord", newRecord.getPrimaryKey()); + } + } + @SuppressWarnings("PMD.CloseResource") private void writeDocument(@Nonnull List fields, Tuple groupingKey, @@ -487,14 +495,11 @@ CompletableFuture update(@Nullable FDBIndexableRecord< AsyncUtil.whenAll(newRecordFields.entrySet().stream().map(entry -> { try { return tryDeleteInWriteOnlyMode(Objects.requireNonNull(newRecord), entry.getKey()).thenCompose(countDeleted -> - partitioner.addToAndSavePartitionMetadata(newRecord, entry.getKey(), destinationPartitionIdHint).thenApply(partitionId -> { - try { - writeDocument(entry.getValue(), entry.getKey(), partitionId, newRecord.getPrimaryKey()); - } catch (IOException e) { - throw LuceneExceptions.toRecordCoreException("Issue updating new index keys", e, "newRecord", newRecord.getPrimaryKey()); - } - return null; - })); + partitioner.addToAndSavePartitionMetadata(newRecord, entry.getKey(), destinationPartitionIdHint) + .thenApply(partitionId -> { + writeDocument(newRecord, entry, partitionId); + return null; + })); } catch (IOException e) { throw LuceneExceptions.toRecordCoreException("Issue updating", e, "record", Objects.requireNonNull(newRecord).getPrimaryKey()); } @@ -541,19 +546,22 @@ private CompletableFuture tryDelete(@Nonnull FDBInd } // partitioned - return partitioner.tryGetPartitionInfo(record, groupingKey).thenApply(partitionInfo -> { + return partitioner.tryGetPartitionInfo(record, groupingKey).thenCompose(partitionInfo -> { + // this might be 0 when in writeOnly mode, but otherwise should not happen. if (partitionInfo != null) { try { int countDeleted = deleteDocument(groupingKey, partitionInfo.getId(), record.getPrimaryKey()); if (countDeleted > 0) { - partitioner.decrementCountAndSave(groupingKey, partitionInfo, countDeleted); + return partitioner.decrementCountAndSave(groupingKey, countDeleted, partitionInfo.getId()) + .thenApply(vignore -> countDeleted); + } else { + return CompletableFuture.completedFuture(countDeleted); } - return countDeleted; } catch (IOException e) { throw LuceneExceptions.toRecordCoreException("Issue deleting", e, "record", record.getPrimaryKey()); } } - return 0; + return CompletableFuture.completedFuture(0); }); } diff --git a/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LucenePartitioner.java b/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LucenePartitioner.java index 8af4abcc03..1786b9121c 100644 --- a/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LucenePartitioner.java +++ b/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LucenePartitioner.java @@ -33,6 +33,7 @@ import com.apple.foundationdb.record.PipelineOperation; import com.apple.foundationdb.record.RecordCoreArgumentException; import com.apple.foundationdb.record.RecordCoreException; +import com.apple.foundationdb.record.RecordCoreInternalException; import com.apple.foundationdb.record.RecordCursor; import com.apple.foundationdb.record.RecordCursorContinuation; import com.apple.foundationdb.record.RecordCursorEndContinuation; @@ -40,6 +41,7 @@ import com.apple.foundationdb.record.ScanProperties; import com.apple.foundationdb.record.TupleRange; import com.apple.foundationdb.record.cursors.ChainedCursor; +import com.apple.foundationdb.record.locking.LockIdentifier; import com.apple.foundationdb.record.logging.KeyValueLogMessage; import com.apple.foundationdb.record.logging.LogMessageKeys; import com.apple.foundationdb.record.lucene.directory.FDBDirectoryManager; @@ -81,7 +83,6 @@ import java.util.Arrays; import java.util.Collection; import java.util.Comparator; -import java.util.Iterator; import java.util.List; import java.util.Objects; import java.util.concurrent.CompletableFuture; @@ -489,28 +490,30 @@ public CompletableFuture addToAndSavePartitionMetad private CompletableFuture addToAndSavePartitionMetadata(@Nonnull final Tuple groupingKey, @Nonnull final Tuple partitioningKey, @Nullable final Integer assignedPartitionIdOverride) { - - final CompletableFuture assignmentFuture; - if (assignedPartitionIdOverride != null) { - assignmentFuture = getPartitionMetaInfoById(assignedPartitionIdOverride, groupingKey); - } else { - assignmentFuture = getOrCreatePartitionInfo(groupingKey, partitioningKey); - } - return assignmentFuture.thenApply(assignedPartition -> { - // assignedPartition is not null, since a new one is created by the previous call if none exist - LucenePartitionInfoProto.LucenePartitionInfo.Builder builder = Objects.requireNonNull(assignedPartition).toBuilder(); - builder.setCount(assignedPartition.getCount() + 1); - if (isOlderThan(partitioningKey, assignedPartition)) { - // clear the previous key - state.context.ensureActive().clear(partitionMetadataKeyFromPartitioningValue(groupingKey, getPartitionKey(assignedPartition))); - builder.setFrom(ByteString.copyFrom(partitioningKey.pack())); - } - if (isNewerThan(partitioningKey, assignedPartition)) { - builder.setTo(ByteString.copyFrom(partitioningKey.pack())); - } - savePartitionMetadata(groupingKey, builder); - return assignedPartition.getId(); - }); + return state.context.doWithWriteLock(new LockIdentifier(partitionMetadataSubspace(groupingKey)), + () -> { + final CompletableFuture assignmentFuture; + if (assignedPartitionIdOverride != null) { + assignmentFuture = getPartitionMetaInfoById(assignedPartitionIdOverride, groupingKey); + } else { + assignmentFuture = getOrCreatePartitionInfo(groupingKey, partitioningKey); + } + return assignmentFuture.thenApply(assignedPartition -> { + // assignedPartition is not null, since a new one is created by the previous call if none exist + LucenePartitionInfoProto.LucenePartitionInfo.Builder builder = Objects.requireNonNull(assignedPartition).toBuilder(); + builder.setCount(assignedPartition.getCount() + 1); + if (isOlderThan(partitioningKey, assignedPartition)) { + // clear the previous key + state.context.ensureActive().clear(partitionMetadataKeyFromPartitioningValue(groupingKey, getPartitionKey(assignedPartition))); + builder.setFrom(ByteString.copyFrom(partitioningKey.pack())); + } + if (isNewerThan(partitioningKey, assignedPartition)) { + builder.setTo(ByteString.copyFrom(partitioningKey.pack())); + } + savePartitionMetadata(groupingKey, builder); + return assignedPartition.getId(); + }); + }); } /** @@ -525,6 +528,10 @@ byte[] partitionMetadataKeyFromPartitioningValue(@Nonnull Tuple groupKey, @Nonnu return state.indexSubspace.pack(partitionMetadataKeyTuple(groupKey, partitionKey)); } + Subspace partitionMetadataSubspace(@Nonnull Tuple groupKey) { + return state.indexSubspace.subspace(groupKey.add(PARTITION_META_SUBSPACE)); + } + private static Tuple partitionMetadataKeyTuple(final @Nonnull Tuple groupKey, @Nonnull Tuple partitionKey) { return groupKey.add(PARTITION_META_SUBSPACE).addAll(partitionKey); } @@ -600,22 +607,30 @@ CompletableFuture decrementCountAndSave(@Nonnull Tuple groupingKey, + int amount, final int partitionId) { + return state.context.doWithWriteLock(new LockIdentifier(partitionMetadataSubspace(groupingKey)), + () -> getPartitionMetaInfoById(partitionId, groupingKey).thenAccept(serialized -> { + if (serialized == null) { + throw new RecordCoreInternalException("Lucene partition metadata changed during delete") + .addLogInfo(LogMessageKeys.INDEX_NAME, state.index.getName()) + .addLogInfo(LogMessageKeys.INDEX_SUBSPACE, state.indexSubspace); + } + LucenePartitionInfoProto.LucenePartitionInfo.Builder builder = Objects.requireNonNull(serialized).toBuilder(); + // note that the to/from of the partition do not get updated, since that would require us to know + // what the next potential boundary value(s) are. The values, nonetheless, remain valid. + builder.setCount(serialized.getCount() - amount); + + if (builder.getCount() < 0) { + // should never happen + throw new RecordCoreInternalException("Issue updating Lucene partition metadata (resulting count < 0)", + LogMessageKeys.PARTITION_ID, partitionId); + } + savePartitionMetadata(groupingKey, builder); + })); } /** @@ -1024,7 +1039,7 @@ private CompletableFuture moveDocsFromPartition(@Nonnull final LuceneRe timings.initializationNanos = System.nanoTime(); fetchedRecordsFuture = fetchedRecordsFuture.whenComplete((ignored, throwable) -> cursor.close()); - return fetchedRecordsFuture.thenCompose(records -> { + return fetchedRecordsFuture.thenApply(records -> { timings.searchNanos = System.nanoTime(); if (records.size() == 0) { throw new RecordCoreException("Unexpected error: 0 records fetched. repartitionContext {}", repartitioningContext); @@ -1041,7 +1056,7 @@ private CompletableFuture moveDocsFromPartition(@Nonnull final LuceneRe if (LOGGER.isDebugEnabled()) { LOGGER.debug("no records to move, partition {}", partitionInfo); } - return CompletableFuture.completedFuture(0); + return 0; } // reset partition info @@ -1092,40 +1107,43 @@ private CompletableFuture moveDocsFromPartition(@Nonnull final LuceneRe } long updateStart = System.nanoTime(); - Iterator> recordIterator = records.iterator(); final int destinationPartitionId = destinationPartition.getId(); - return AsyncUtil.whileTrue(() -> indexMaintainer.update(null, recordIterator.next(), destinationPartitionId) - .thenApply(ignored -> recordIterator.hasNext()), state.context.getExecutor()) - .thenApply(ignored -> { - if (LOGGER.isDebugEnabled()) { - long updateNanos = System.nanoTime(); - final KeyValueLogMessage logMessage = repartitionLogMessage("Repartitioned records", groupingKey, records.size(), partitionInfo); - logMessage.addKeyAndValue("totalMicros", TimeUnit.NANOSECONDS.toMicros(updateNanos - timings.startNanos)); - logMessage.addKeyAndValue("initializationMicros", TimeUnit.NANOSECONDS.toMicros(timings.initializationNanos - timings.startNanos)); - logMessage.addKeyAndValue("searchMicros", TimeUnit.NANOSECONDS.toMicros(timings.searchNanos - timings.initializationNanos)); - logMessage.addKeyAndValue("clearInfoMicros", TimeUnit.NANOSECONDS.toMicros(timings.clearInfoNanos - timings.searchNanos)); - if (timings.emptyingNanos > 0) { - logMessage.addKeyAndValue("emptyingMicros", TimeUnit.NANOSECONDS.toMicros(timings.emptyingNanos - timings.clearInfoNanos)); - } - if (timings.deleteNanos > 0) { - logMessage.addKeyAndValue("deleteMicros", TimeUnit.NANOSECONDS.toMicros(timings.deleteNanos - timings.clearInfoNanos)); - } - if (timings.metadataUpdateNanos > 0) { - logMessage.addKeyAndValue("metadataUpdateMicros", TimeUnit.NANOSECONDS.toMicros(timings.metadataUpdateNanos - timings.deleteNanos)); - } - if (timings.createPartitionNanos > 0) { - logMessage.addKeyAndValue("createPartitionMicros", TimeUnit.NANOSECONDS.toMicros(timings.createPartitionNanos - endCleanupNanos)); - } - logMessage.addKeyAndValue("updateMicros", TimeUnit.NANOSECONDS.toMicros(updateNanos - updateStart)); - if (timerSnapshot != null && state.context.getTimer() != null) { - logMessage.addKeysAndValues( - StoreTimer.getDifference(state.context.getTimer(), timerSnapshot) - .getKeysAndValues()); - } - LOGGER.debug(logMessage.toString()); - } - return records.size(); - }); + for (FDBIndexableRecord record : records) { + LuceneDocumentFromRecord.getRecordFields(state.index.getRootExpression(), record) + .entrySet().forEach(entry -> { + indexMaintainer.writeDocument(record, entry, destinationPartitionId); + // TODO could update the partition once + addToAndSavePartitionMetadata(record, groupingKey, destinationPartitionId); + }); + } + if (LOGGER.isDebugEnabled()) { + long updateNanos = System.nanoTime(); + final KeyValueLogMessage logMessage = repartitionLogMessage("Repartitioned records", groupingKey, records.size(), partitionInfo); + logMessage.addKeyAndValue("totalMicros", TimeUnit.NANOSECONDS.toMicros(updateNanos - timings.startNanos)); + logMessage.addKeyAndValue("initializationMicros", TimeUnit.NANOSECONDS.toMicros(timings.initializationNanos - timings.startNanos)); + logMessage.addKeyAndValue("searchMicros", TimeUnit.NANOSECONDS.toMicros(timings.searchNanos - timings.initializationNanos)); + logMessage.addKeyAndValue("clearInfoMicros", TimeUnit.NANOSECONDS.toMicros(timings.clearInfoNanos - timings.searchNanos)); + if (timings.emptyingNanos > 0) { + logMessage.addKeyAndValue("emptyingMicros", TimeUnit.NANOSECONDS.toMicros(timings.emptyingNanos - timings.clearInfoNanos)); + } + if (timings.deleteNanos > 0) { + logMessage.addKeyAndValue("deleteMicros", TimeUnit.NANOSECONDS.toMicros(timings.deleteNanos - timings.clearInfoNanos)); + } + if (timings.metadataUpdateNanos > 0) { + logMessage.addKeyAndValue("metadataUpdateMicros", TimeUnit.NANOSECONDS.toMicros(timings.metadataUpdateNanos - timings.deleteNanos)); + } + if (timings.createPartitionNanos > 0) { + logMessage.addKeyAndValue("createPartitionMicros", TimeUnit.NANOSECONDS.toMicros(timings.createPartitionNanos - endCleanupNanos)); + } + logMessage.addKeyAndValue("updateMicros", TimeUnit.NANOSECONDS.toMicros(updateNanos - updateStart)); + if (timerSnapshot != null && state.context.getTimer() != null) { + logMessage.addKeysAndValues( + StoreTimer.getDifference(state.context.getTimer(), timerSnapshot) + .getKeysAndValues()); + } + LOGGER.debug(logMessage.toString()); + } + return records.size(); }); } diff --git a/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintenanceTest.java b/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintenanceTest.java index 9bb8dc845d..8c0ff00fab 100644 --- a/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintenanceTest.java +++ b/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintenanceTest.java @@ -808,7 +808,8 @@ void concurrentUpdate() throws IOException { final boolean primaryKeySegmentIndexEnabled = true; // LucenePartitioner is not thread safe, and the counts get broken // See: https://github.com/FoundationDB/fdb-record-layer/issues/2990 - final int partitionHighWatermark = -1; + // TODO parameterize this, and add a test for concurrent inserts + final int partitionHighWatermark = 100_0000; final LuceneIndexTestDataModel dataModel = new LuceneIndexTestDataModel.Builder(seed, this::getStoreBuilder, pathManager) .setIsGrouped(isGrouped) .setIsSynthetic(isSynthetic) @@ -841,6 +842,9 @@ void concurrentUpdate() throws IOException { } + final LuceneIndexTestValidator luceneIndexTestValidator = new LuceneIndexTestValidator(() -> openContext(contextProps), context -> Objects.requireNonNull(dataModel.schemaSetup.apply(context))); + luceneIndexTestValidator.validate(dataModel.index, dataModel.groupingKeyToPrimaryKeyToPartitionKey, isSynthetic ? "child_str_value:forth" : "text_value:about"); + try (FDBRecordContext context = openContext(contextProps)) { FDBRecordStore recordStore = Objects.requireNonNull(dataModel.schemaSetup.apply(context)); recordStore.getIndexDeferredMaintenanceControl().setAutoMergeDuringCommit(false); @@ -854,8 +858,6 @@ void concurrentUpdate() throws IOException { commit(context); } - - final LuceneIndexTestValidator luceneIndexTestValidator = new LuceneIndexTestValidator(() -> openContext(contextProps), context -> Objects.requireNonNull(dataModel.schemaSetup.apply(context))); luceneIndexTestValidator.validate(dataModel.index, dataModel.groupingKeyToPrimaryKeyToPartitionKey, isSynthetic ? "child_str_value:forth" : "text_value:about"); if (isGrouped) { diff --git a/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexTestValidator.java b/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexTestValidator.java index b6ba3bf270..ede46ffab5 100644 --- a/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexTestValidator.java +++ b/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexTestValidator.java @@ -60,6 +60,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -192,13 +193,13 @@ private void validatePartitionedGroup(final Index index, final String universalS "partitionInfo.count", partitionInfo.getCount())); // if partitionInfo.getCount() is wrong, this can be very confusing, so a different assertion might be // worthwhile + assertThat(records, hasSize(greaterThanOrEqualTo(visitedCount + partitionInfo.getCount()))); final Set expectedPrimaryKeys = Set.copyOf(records.subList(visitedCount, visitedCount + partitionInfo.getCount())); validateDocsInPartition(recordStore, index, partitionInfo.getId(), groupingKey, expectedPrimaryKeys, universalSearch); visitedCount += partitionInfo.getCount(); - assertThat(records.size(), greaterThanOrEqualTo(visitedCount)); validatePrimaryKeySegmentIndex(recordStore, index, groupingKey, partitionInfo.getId(), expectedPrimaryKeys, allowDuplicatePrimaryKeys); expectedPrimaryKeys.forEach(primaryKey -> missingDocuments.get(groupingKey).remove(primaryKey));