From c694659d9403f19fd7fa96c0ef0cd45567e44b08 Mon Sep 17 00:00:00 2001
From: Scott Dugas
Date: Thu, 5 Dec 2024 16:45:28 -0500
Subject: [PATCH 1/2] Add a test that does concurrent lucene updates
---
.../lucene/LuceneIndexMaintenanceTest.java | 82 +++++++++
.../lucene/LuceneIndexTestDataModel.java | 51 +++++-
.../lucene/LuceneIndexTestValidator.java | 156 +++++++++++-------
3 files changed, 220 insertions(+), 69 deletions(-)
diff --git a/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintenanceTest.java b/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintenanceTest.java
index ebf8dc8d32..9bb8dc845d 100644
--- a/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintenanceTest.java
+++ b/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintenanceTest.java
@@ -75,6 +75,7 @@
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayDeque;
+import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.IdentityHashMap;
@@ -86,6 +87,8 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ForkJoinWorkerThread;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -781,6 +784,85 @@ public void ensureValid() throws IOException {
}
}
+ @Test
+ void concurrentUpdate() throws IOException {
+ // Once the two issues noted below are fixed, we should make this parameterized, and run with additional random
+ // configurations.
+ AtomicInteger threadCounter = new AtomicInteger();
+ // Synchronization blocks in FDBDirectoryWrapper can cause a deadlock
+ // see https://github.com/FoundationDB/fdb-record-layer/issues/2989
+ // So set the pool large enough to overcome that
+ this.dbExtension.getDatabaseFactory().setExecutor(new ForkJoinPool(30,
+ pool -> {
+ final ForkJoinWorkerThread thread = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
+ thread.setName("ConcurrentUpdatePool-" + threadCounter.getAndIncrement());
+ return thread;
+ },
+ null, false));
+ final long seed = 320947L;
+ final boolean isGrouped = true;
+ // updating the parent & child concurrently is not thread safe, we may want to fix the behavior, or say that is
+ // not supported, as it is the same as updating the same record concurrently, which I don't think is generally
+ // supported.
+ final boolean isSynthetic = false;
+ final boolean primaryKeySegmentIndexEnabled = true;
+ // LucenePartitioner is not thread safe, and the counts get broken
+ // See: https://github.com/FoundationDB/fdb-record-layer/issues/2990
+ final int partitionHighWatermark = -1;
+ final LuceneIndexTestDataModel dataModel = new LuceneIndexTestDataModel.Builder(seed, this::getStoreBuilder, pathManager)
+ .setIsGrouped(isGrouped)
+ .setIsSynthetic(isSynthetic)
+ .setPrimaryKeySegmentIndexEnabled(primaryKeySegmentIndexEnabled)
+ .setPartitionHighWatermark(partitionHighWatermark)
+ .build();
+
+ final int repartitionCount = 10;
+ final int loopCount = 30;
+
+ final RecordLayerPropertyStorage contextProps = RecordLayerPropertyStorage.newBuilder()
+ .addProp(LuceneRecordContextProperties.LUCENE_REPARTITION_DOCUMENT_COUNT, repartitionCount)
+ .addProp(LuceneRecordContextProperties.LUCENE_MAX_DOCUMENTS_TO_MOVE_DURING_REPARTITIONING, dataModel.nextInt(1000) + repartitionCount)
+ .addProp(LuceneRecordContextProperties.LUCENE_MERGE_SEGMENTS_PER_TIER, (double)dataModel.nextInt(10) + 2) // it must be at least 2.0
+ .build();
+ for (int i = 0; i < loopCount; i++) {
+ LOGGER.info(KeyValueLogMessage.of("concurrentUpdate loop",
+ "iteration", i,
+ "groupCount", dataModel.groupingKeyToPrimaryKeyToPartitionKey.size(),
+ "docCount", dataModel.groupingKeyToPrimaryKeyToPartitionKey.values().stream().mapToInt(Map::size).sum(),
+ "docMinPerGroup", dataModel.groupingKeyToPrimaryKeyToPartitionKey.values().stream().mapToInt(Map::size).min(),
+ "docMaxPerGroup", dataModel.groupingKeyToPrimaryKeyToPartitionKey.values().stream().mapToInt(Map::size).max()));
+
+ long start = 234098;
+ try (FDBRecordContext context = openContext(contextProps)) {
+ dataModel.saveRecords(10, start, context, 1);
+ commit(context);
+ }
+ explicitMergeIndex(dataModel.index, contextProps, dataModel.schemaSetup);
+ }
+
+
+ try (FDBRecordContext context = openContext(contextProps)) {
+ FDBRecordStore recordStore = Objects.requireNonNull(dataModel.schemaSetup.apply(context));
+ recordStore.getIndexDeferredMaintenanceControl().setAutoMergeDuringCommit(false);
+ assertThat(dataModel.updateableRecords, Matchers.aMapWithSize(Matchers.greaterThan(30)));
+ LOGGER.info("concurrentUpdate: Starting updates");
+ RecordCursor.fromList(new ArrayList<>(dataModel.updateableRecords.entrySet()))
+ .mapPipelined(entry -> {
+ return dataModel.updateRecord(recordStore, entry.getKey(), entry.getValue());
+ }, 10)
+ .asList().join();
+ commit(context);
+ }
+
+
+ final LuceneIndexTestValidator luceneIndexTestValidator = new LuceneIndexTestValidator(() -> openContext(contextProps), context -> Objects.requireNonNull(dataModel.schemaSetup.apply(context)));
+ luceneIndexTestValidator.validate(dataModel.index, dataModel.groupingKeyToPrimaryKeyToPartitionKey, isSynthetic ? "child_str_value:forth" : "text_value:about");
+
+ if (isGrouped) {
+ validateDeleteWhere(isSynthetic, dataModel.groupingKeyToPrimaryKeyToPartitionKey, contextProps, dataModel.schemaSetup, dataModel.index);
+ }
+ }
+
static Stream concurrentStoreTest() {
return Stream.concat(
Stream.of(
diff --git a/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexTestDataModel.java b/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexTestDataModel.java
index 58e12ba3cb..8724c8f913 100644
--- a/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexTestDataModel.java
+++ b/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexTestDataModel.java
@@ -34,6 +34,7 @@
import com.apple.foundationdb.record.test.TestKeySpace;
import com.apple.foundationdb.record.test.TestKeySpacePathManagerExtension;
import com.apple.foundationdb.tuple.Tuple;
+import com.google.protobuf.Message;
import javax.annotation.Nonnull;
import java.util.HashMap;
@@ -41,6 +42,7 @@
import java.util.Objects;
import java.util.Random;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
/**
@@ -64,6 +66,7 @@ public class LuceneIndexTestDataModel {
*
*/
final Map> groupingKeyToPrimaryKeyToPartitionKey;
+ Map> updateableRecords;
private LuceneIndexTestDataModel(final Builder builder) {
random = builder.random;
@@ -92,6 +95,7 @@ private LuceneIndexTestDataModel(final Builder builder) {
return store;
};
groupingKeyToPrimaryKeyToPartitionKey = new HashMap<>();
+ updateableRecords = new HashMap<>();
}
@Override
@@ -121,7 +125,7 @@ void saveRecords(int count, long start, FDBRecordContext context, final int grou
FDBRecordStore recordStore = Objects.requireNonNull(schemaSetup.apply(context));
for (int j = 0; j < count; j++) {
LuceneIndexTestDataModel.saveRecord(isGrouped, isSynthetic, random, groupingKeyToPrimaryKeyToPartitionKey,
- textGenerator, start, recordStore, group);
+ textGenerator, start, recordStore, group, updateableRecords);
}
}
@@ -129,10 +133,17 @@ static void saveRecord(final boolean isGrouped, final boolean isSynthetic, final
final Map> groupingKeyToPrimaryKeyToPartitionKey,
final RandomTextGenerator textGenerator, final long start, final FDBRecordStore recordStore,
final int group) {
+ saveRecord(isGrouped, isSynthetic, random, groupingKeyToPrimaryKeyToPartitionKey, textGenerator, start, recordStore, group, new HashMap<>());
+ }
+
+ static void saveRecord(final boolean isGrouped, final boolean isSynthetic, final Random random,
+ final Map> groupingKeyToPrimaryKeyToPartitionKey,
+ final RandomTextGenerator textGenerator, final long start, final FDBRecordStore recordStore,
+ final int group, final Map> updateableRecords) {
final Tuple groupTuple = isGrouped ? Tuple.from(group) : Tuple.from();
final int countInGroup = groupingKeyToPrimaryKeyToPartitionKey.computeIfAbsent(groupTuple, key -> new HashMap<>()).size();
long timestamp = start + countInGroup + random.nextInt(20) - 5;
- final Tuple primaryKey = saveRecord(recordStore, isSynthetic, group, countInGroup, timestamp, textGenerator, random);
+ final Tuple primaryKey = saveRecord(recordStore, isSynthetic, group, countInGroup, timestamp, textGenerator, random, updateableRecords);
groupingKeyToPrimaryKeyToPartitionKey.computeIfAbsent(groupTuple, key -> new HashMap<>())
.put(primaryKey, Tuple.from(timestamp).addAll(primaryKey));
}
@@ -144,7 +155,8 @@ static Tuple saveRecord(final FDBRecordStore recordStore,
final int countInGroup,
final long timestamp,
final RandomTextGenerator textGenerator,
- final Random random) {
+ final Random random,
+ final Map> updateableRecords) {
var parent = TestRecordsGroupedParentChildProto.MyParentRecord.newBuilder()
.setGroup(group)
.setRecNo(1001L + countInGroup)
@@ -155,6 +167,8 @@ static Tuple saveRecord(final FDBRecordStore recordStore,
.setChildRecNo(1000L - countInGroup)
.build();
Tuple primaryKey;
+ final Tuple parentPrimaryKey = recordStore.saveRecord(parent).getPrimaryKey();
+ updateableRecords.put(parentPrimaryKey, existingRecord -> updateParentRecord(existingRecord, random));
if (isSynthetic) {
var child = TestRecordsGroupedParentChildProto.MyChildRecord.newBuilder()
.setGroup(group)
@@ -165,15 +179,40 @@ static Tuple saveRecord(final FDBRecordStore recordStore,
final Tuple syntheticRecordTypeKey = recordStore.getRecordMetaData()
.getSyntheticRecordType("JoinChildren")
.getRecordTypeKeyTuple();
+ final Tuple childPrimaryKey = recordStore.saveRecord(child).getPrimaryKey();
+ updateableRecords.put(childPrimaryKey, existingRecord -> updateChildRecord(existingRecord, random));
primaryKey = Tuple.from(syntheticRecordTypeKey.getItems().get(0),
- recordStore.saveRecord(parent).getPrimaryKey().getItems(),
- recordStore.saveRecord(child).getPrimaryKey().getItems());
+ parentPrimaryKey.getItems(),
+ childPrimaryKey.getItems());
} else {
- primaryKey = recordStore.saveRecord(parent).getPrimaryKey();
+ primaryKey = parentPrimaryKey;
}
return primaryKey;
}
+
+ private static Message updateParentRecord(Message existingRecord, final Random random) {
+ final var builder = TestRecordsGroupedParentChildProto.MyParentRecord.newBuilder();
+ builder.mergeFrom(existingRecord);
+ builder.setIntValue(random.nextInt());
+ return builder.build();
+ }
+
+ private static Message updateChildRecord(final Message existingRecord, final Random random) {
+ final var builder = TestRecordsGroupedParentChildProto.MyChildRecord.newBuilder();
+ builder.mergeFrom(existingRecord);
+ builder.setOtherValue(random.nextInt());
+ return builder.build();
+ }
+
+ public CompletableFuture updateRecord(final FDBRecordStore recordStore,
+ Tuple primaryKey,
+ Function updateMessage) {
+ return recordStore.loadRecordAsync(primaryKey).thenAccept(existingRecord -> {
+ recordStore.saveRecord(updateMessage.apply(existingRecord.getRecord()));
+ });
+ }
+
@Nonnull
static Index addIndex(final boolean isSynthetic, final KeyExpression rootExpression,
final Map options, final RecordMetaDataBuilder metaDataBuilder) {
diff --git a/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexTestValidator.java b/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexTestValidator.java
index d72aabb3c3..b6ba3bf270 100644
--- a/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexTestValidator.java
+++ b/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexTestValidator.java
@@ -59,6 +59,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -112,11 +113,8 @@ void validate(Index index, final Map> expectedDocumentI
*/
void validate(Index index, final Map> expectedDocumentInformation,
final String universalSearch, final boolean allowDuplicatePrimaryKeys) throws IOException {
- final int partitionHighWatermark = Integer.parseInt(index.getOption(LuceneIndexOptions.INDEX_PARTITION_HIGH_WATERMARK));
- String partitionLowWaterMarkStr = index.getOption(LuceneIndexOptions.INDEX_PARTITION_LOW_WATERMARK);
- final int partitionLowWatermark = partitionLowWaterMarkStr == null ?
- Math.max(LucenePartitioner.DEFAULT_PARTITION_LOW_WATERMARK, 1) :
- Integer.parseInt(partitionLowWaterMarkStr);
+ final int partitionHighWatermark = getPartitionHighWatermark(index);
+ final int partitionLowWatermark = getPartitionLowWatermark(index);
Map> missingDocuments = new HashMap<>();
expectedDocumentInformation.forEach((groupingKey, groupedIds) -> {
@@ -128,71 +126,102 @@ void validate(Index index, final Map> expectedDocumentI
"group", groupingKey,
"expectedCount", entry.getValue().size()));
- final Set> entries = entry.getValue().entrySet();
- final List records = entries.stream()
- // I think in theory, this should be able to be:
- // Map.Entry.comparingByValue().thenComparing(Map.Entry.comparingByKey())
- // but I could not get the types to work
- .sorted((c1, c2) -> {
- final int valueComparison = c1.getValue().compareTo(c2.getValue());
- if (valueComparison != 0) {
- return valueComparison;
- } else {
- return c1.getKey().compareTo(c2.getKey());
- }
- })
+ final List records = entry.getValue().entrySet().stream()
+ .sorted(Map.Entry.comparingByValue().thenComparing(Map.Entry.comparingByKey()))
.map(Map.Entry::getKey)
.collect(Collectors.toList());
- List partitionInfos = getPartitionMeta(index, groupingKey);
- partitionInfos.sort(Comparator.comparing(info -> Tuple.fromBytes(info.getFrom().toByteArray())));
- final String allCounts = partitionInfos.stream()
- .map(info -> Tuple.fromBytes(info.getFrom().toByteArray()).toString() + info.getCount())
- .collect(Collectors.joining(",", "[", "]"));
- Set usedPartitionIds = new HashSet<>();
- Tuple lastToTuple = null;
- int visitedCount = 0;
+ if (partitionHighWatermark > 0) {
+ validatePartitionedGroup(index, universalSearch, allowDuplicatePrimaryKeys, groupingKey, partitionLowWatermark, partitionHighWatermark, records, missingDocuments);
+ } else {
+ validateUnpartitionedGroup(index, universalSearch, allowDuplicatePrimaryKeys, groupingKey, partitionLowWatermark, partitionHighWatermark, records, missingDocuments);
+ }
+ }
+ missingDocuments.entrySet().removeIf(entry -> entry.getValue().isEmpty());
+ assertEquals(Map.of(), missingDocuments, "We should have found all documents in the index");
+ }
- try (FDBRecordContext context = contextProvider.get()) {
- final FDBRecordStore recordStore = schemaSetup.apply(context);
+ private void validateUnpartitionedGroup(final Index index, final String universalSearch, final boolean allowDuplicatePrimaryKeys, final Tuple groupingKey, final int partitionLowWatermark, final int partitionHighWatermark, final List records, final Map> missingDocuments) throws IOException {
+ try (FDBRecordContext context = contextProvider.get()) {
+ final FDBRecordStore recordStore = schemaSetup.apply(context);
+ LOGGER.debug(KeyValueLogMessage.of("Visiting group",
+ "group", groupingKey,
+ "documentsInGroup", records.size()));
+ validateDocsInPartition(recordStore, index, null, groupingKey, Set.copyOf(records), universalSearch);
+ validatePrimaryKeySegmentIndex(recordStore, index, groupingKey, null,
+ Set.copyOf(records), allowDuplicatePrimaryKeys);
+ Set.copyOf(records).forEach(primaryKey -> missingDocuments.get(groupingKey).remove(primaryKey));
+ }
+ }
- for (int i = 0; i < partitionInfos.size(); i++) {
- final LucenePartitionInfoProto.LucenePartitionInfo partitionInfo = partitionInfos.get(i);
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("Group: " + groupingKey + " PartitionInfo[" + partitionInfo.getId() +
- "]: count:" + partitionInfo.getCount() + " " +
- Tuple.fromBytes(partitionInfo.getFrom().toByteArray()) + "-> " +
- Tuple.fromBytes(partitionInfo.getTo().toByteArray()));
- }
+ private void validatePartitionedGroup(final Index index, final String universalSearch, final boolean allowDuplicatePrimaryKeys, final Tuple groupingKey, final int partitionLowWatermark, final int partitionHighWatermark, final List records, final Map> missingDocuments) throws IOException {
+ List partitionInfos = getPartitionMeta(index, groupingKey);
+ partitionInfos.sort(Comparator.comparing(info -> Tuple.fromBytes(info.getFrom().toByteArray())));
+ final String allCounts = partitionInfos.stream()
+ .map(info -> Tuple.fromBytes(info.getFrom().toByteArray()).toString() + info.getCount())
+ .collect(Collectors.joining(",", "[", "]"));
+ Set usedPartitionIds = new HashSet<>();
+ Tuple lastToTuple = null;
+ int visitedCount = 0;
- assertTrue(isParititionCountWithinBounds(partitionInfos, i, partitionLowWatermark, partitionHighWatermark),
- "Group: " + groupingKey + " - " + allCounts + "\nlowWatermark: " + partitionLowWatermark + ", highWatermark: " + partitionHighWatermark +
- "\nCurrent count: " + partitionInfo.getCount());
- assertTrue(usedPartitionIds.add(partitionInfo.getId()), () -> "Duplicate id: " + partitionInfo);
- final Tuple fromTuple = Tuple.fromBytes(partitionInfo.getFrom().toByteArray());
- if (i > 0) {
- assertThat(fromTuple, greaterThan(lastToTuple));
- }
- lastToTuple = Tuple.fromBytes(partitionInfo.getTo().toByteArray());
- assertThat(fromTuple, lessThanOrEqualTo(lastToTuple));
+ try (FDBRecordContext context = contextProvider.get()) {
+ final FDBRecordStore recordStore = schemaSetup.apply(context);
+ for (int i = 0; i < partitionInfos.size(); i++) {
+ final LucenePartitionInfoProto.LucenePartitionInfo partitionInfo = partitionInfos.get(i);
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace("Group: " + groupingKey + " PartitionInfo[" + partitionInfo.getId() +
+ "]: count:" + partitionInfo.getCount() + " " +
+ Tuple.fromBytes(partitionInfo.getFrom().toByteArray()) + "-> " +
+ Tuple.fromBytes(partitionInfo.getTo().toByteArray()));
+ }
- LOGGER.debug(KeyValueLogMessage.of("Visited partition",
- "group", groupingKey,
- "documentsSoFar", visitedCount,
- "documentsInGroup", records.size(),
- "partitionInfo.count", partitionInfo.getCount()));
- final Set expectedPrimaryKeys = Set.copyOf(records.subList(visitedCount, visitedCount + partitionInfo.getCount()));
- validateDocsInPartition(recordStore, index, partitionInfo.getId(), groupingKey,
- expectedPrimaryKeys,
- universalSearch);
- visitedCount += partitionInfo.getCount();
- validatePrimaryKeySegmentIndex(recordStore, index, groupingKey, partitionInfo.getId(),
- expectedPrimaryKeys, allowDuplicatePrimaryKeys);
- expectedPrimaryKeys.forEach(primaryKey -> missingDocuments.get(groupingKey).remove(primaryKey));
+ assertTrue(isParititionCountWithinBounds(partitionInfos, i, partitionLowWatermark, partitionHighWatermark),
+ "Group: " + groupingKey + " - " + allCounts + "\nlowWatermark: " + partitionLowWatermark + ", highWatermark: " + partitionHighWatermark +
+ "\nCurrent count: " + partitionInfo.getCount());
+ assertTrue(usedPartitionIds.add(partitionInfo.getId()), () -> "Duplicate id: " + partitionInfo);
+ final Tuple fromTuple = Tuple.fromBytes(partitionInfo.getFrom().toByteArray());
+ if (i > 0) {
+ assertThat(fromTuple, greaterThan(lastToTuple));
}
+ lastToTuple = Tuple.fromBytes(partitionInfo.getTo().toByteArray());
+ assertThat(fromTuple, lessThanOrEqualTo(lastToTuple));
+
+ LOGGER.debug(KeyValueLogMessage.of("Visited partition",
+ "group", groupingKey,
+ "documentsSoFar", visitedCount,
+ "documentsInGroup", records.size(),
+ "partitionInfo.count", partitionInfo.getCount()));
+ // if partitionInfo.getCount() is wrong, this can be very confusing, so a different assertion might be
+ // worthwhile
+ final Set expectedPrimaryKeys = Set.copyOf(records.subList(visitedCount,
+ visitedCount + partitionInfo.getCount()));
+ validateDocsInPartition(recordStore, index, partitionInfo.getId(), groupingKey,
+ expectedPrimaryKeys,
+ universalSearch);
+ visitedCount += partitionInfo.getCount();
+ assertThat(records.size(), greaterThanOrEqualTo(visitedCount));
+ validatePrimaryKeySegmentIndex(recordStore, index, groupingKey, partitionInfo.getId(),
+ expectedPrimaryKeys, allowDuplicatePrimaryKeys);
+ expectedPrimaryKeys.forEach(primaryKey -> missingDocuments.get(groupingKey).remove(primaryKey));
}
}
- missingDocuments.entrySet().removeIf(entry -> entry.getValue().isEmpty());
- assertEquals(Map.of(), missingDocuments, "We should have found all documents in the index");
+ }
+
+ private static int getPartitionLowWatermark(final Index index) {
+ String partitionLowWaterMarkStr = index.getOption(LuceneIndexOptions.INDEX_PARTITION_LOW_WATERMARK);
+ if (partitionLowWaterMarkStr == null) {
+ return Math.max(LucenePartitioner.DEFAULT_PARTITION_LOW_WATERMARK, 1);
+ } else {
+ return Integer.parseInt(partitionLowWaterMarkStr);
+ }
+ }
+
+ private static int getPartitionHighWatermark(final Index index) {
+ final String option = index.getOption(LuceneIndexOptions.INDEX_PARTITION_HIGH_WATERMARK);
+ if (option == null) {
+ return -1;
+ } else {
+ return Integer.parseInt(option);
+ }
}
List getPartitionMeta(Index index,
@@ -226,7 +255,8 @@ int getPartitionExtraCapacity(int count, int highWatermark) {
return Math.max(0, highWatermark - count);
}
- public static void validateDocsInPartition(final FDBRecordStore recordStore, Index index, int partitionId, Tuple groupingKey,
+ public static void validateDocsInPartition(final FDBRecordStore recordStore, Index index,
+ @Nullable Integer partitionId, Tuple groupingKey,
Set expectedPrimaryKeys, final String universalSearch) throws IOException {
LuceneScanQuery scanQuery;
if (groupingKey.isEmpty()) {
@@ -262,7 +292,7 @@ public static void validateDocsInPartition(final FDBRecordStore recordStore, Ind
}
public static IndexReader getIndexReader(final FDBRecordStore recordStore, final Index index,
- final Tuple groupingKey, final int partitionId) throws IOException {
+ final Tuple groupingKey, @Nullable final Integer partitionId) throws IOException {
final FDBDirectoryManager manager = getDirectoryManager(recordStore, index);
return manager.getIndexReader(groupingKey, partitionId);
}
From e666678f763ebfdb36e04379e182df8d73c59ccc Mon Sep 17 00:00:00 2001
From: Scott Dugas
Date: Wed, 11 Dec 2024 18:11:37 -0500
Subject: [PATCH 2/2] Add comment explaining test purpose
---
.../record/lucene/LuceneIndexMaintenanceTest.java | 9 +++++++++
1 file changed, 9 insertions(+)
diff --git a/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintenanceTest.java b/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintenanceTest.java
index 9bb8dc845d..bdbd39a2da 100644
--- a/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintenanceTest.java
+++ b/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintenanceTest.java
@@ -784,6 +784,15 @@ public void ensureValid() throws IOException {
}
}
+ /**
+ * Test that updating records in the same transaction does not cause issues with the executor, and doesn't result in
+ * a corrupted index.
+ *
+ * See issues: #2989 and
+ * #2990.
+ *
+ * @throws IOException if there's an issue with Lucene
+ */
@Test
void concurrentUpdate() throws IOException {
// Once the two issues noted below are fixed, we should make this parameterized, and run with additional random