Skip to content

Commit

Permalink
Merge pull request #2996 from ScottDugas/concurrent-lucene-updates
Browse files Browse the repository at this point in the history
Add a test that does concurrent lucene updates
  • Loading branch information
ScottDugas authored Dec 12, 2024
2 parents 5f21535 + bc88d46 commit 93d4b97
Show file tree
Hide file tree
Showing 3 changed files with 237 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.IdentityHashMap;
Expand All @@ -86,6 +87,8 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinWorkerThread;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -781,6 +784,94 @@ public void ensureValid() throws IOException {
}
}

/**
* Test that updating records in the same transaction does not cause issues with the executor, and doesn't result in
* a corrupted index.
* <p>
* See issues: <a href="https://github.com/FoundationDB/fdb-record-layer/issues/2989">#2989</a> and
* <a href="https://github.com/FoundationDB/fdb-record-layer/issues/2990">#2990</a>.
* </p>
* @throws IOException if there's an issue with Lucene
*/
@Test
void concurrentUpdate() throws IOException {
// Once the two issues noted below are fixed, we should make this parameterized, and run with additional random
// configurations.
AtomicInteger threadCounter = new AtomicInteger();
// Synchronization blocks in FDBDirectoryWrapper can cause a deadlock
// see https://github.com/FoundationDB/fdb-record-layer/issues/2989
// So set the pool large enough to overcome that
this.dbExtension.getDatabaseFactory().setExecutor(new ForkJoinPool(30,
pool -> {
final ForkJoinWorkerThread thread = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
thread.setName("ConcurrentUpdatePool-" + threadCounter.getAndIncrement());
return thread;
},
null, false));
final long seed = 320947L;
final boolean isGrouped = true;
// updating the parent & child concurrently is not thread safe, we may want to fix the behavior, or say that is
// not supported, as it is the same as updating the same record concurrently, which I don't think is generally
// supported.
final boolean isSynthetic = false;
final boolean primaryKeySegmentIndexEnabled = true;
// LucenePartitioner is not thread safe, and the counts get broken
// See: https://github.com/FoundationDB/fdb-record-layer/issues/2990
final int partitionHighWatermark = -1;
final LuceneIndexTestDataModel dataModel = new LuceneIndexTestDataModel.Builder(seed, this::getStoreBuilder, pathManager)
.setIsGrouped(isGrouped)
.setIsSynthetic(isSynthetic)
.setPrimaryKeySegmentIndexEnabled(primaryKeySegmentIndexEnabled)
.setPartitionHighWatermark(partitionHighWatermark)
.build();

final int repartitionCount = 10;
final int loopCount = 30;

final RecordLayerPropertyStorage contextProps = RecordLayerPropertyStorage.newBuilder()
.addProp(LuceneRecordContextProperties.LUCENE_REPARTITION_DOCUMENT_COUNT, repartitionCount)
.addProp(LuceneRecordContextProperties.LUCENE_MAX_DOCUMENTS_TO_MOVE_DURING_REPARTITIONING, dataModel.nextInt(1000) + repartitionCount)
.addProp(LuceneRecordContextProperties.LUCENE_MERGE_SEGMENTS_PER_TIER, (double)dataModel.nextInt(10) + 2) // it must be at least 2.0
.build();
for (int i = 0; i < loopCount; i++) {
LOGGER.info(KeyValueLogMessage.of("concurrentUpdate loop",
"iteration", i,
"groupCount", dataModel.groupingKeyToPrimaryKeyToPartitionKey.size(),
"docCount", dataModel.groupingKeyToPrimaryKeyToPartitionKey.values().stream().mapToInt(Map::size).sum(),
"docMinPerGroup", dataModel.groupingKeyToPrimaryKeyToPartitionKey.values().stream().mapToInt(Map::size).min(),
"docMaxPerGroup", dataModel.groupingKeyToPrimaryKeyToPartitionKey.values().stream().mapToInt(Map::size).max()));

long start = 234098;
try (FDBRecordContext context = openContext(contextProps)) {
dataModel.saveRecords(10, start, context, 1);
commit(context);
}
explicitMergeIndex(dataModel.index, contextProps, dataModel.schemaSetup);
}


try (FDBRecordContext context = openContext(contextProps)) {
FDBRecordStore recordStore = Objects.requireNonNull(dataModel.schemaSetup.apply(context));
recordStore.getIndexDeferredMaintenanceControl().setAutoMergeDuringCommit(false);
assertThat(dataModel.updateableRecords, Matchers.aMapWithSize(Matchers.greaterThan(30)));
LOGGER.info("concurrentUpdate: Starting updates");
RecordCursor.fromList(new ArrayList<>(dataModel.updateableRecords.entrySet()))
.mapPipelined(entry -> {
return dataModel.updateRecord(recordStore, entry.getKey(), entry.getValue());
}, 10)
.asList().join();
commit(context);
}


final LuceneIndexTestValidator luceneIndexTestValidator = new LuceneIndexTestValidator(() -> openContext(contextProps), context -> Objects.requireNonNull(dataModel.schemaSetup.apply(context)));
luceneIndexTestValidator.validate(dataModel.index, dataModel.groupingKeyToPrimaryKeyToPartitionKey, isSynthetic ? "child_str_value:forth" : "text_value:about");

if (isGrouped) {
validateDeleteWhere(isSynthetic, dataModel.groupingKeyToPrimaryKeyToPartitionKey, contextProps, dataModel.schemaSetup, dataModel.index);
}
}

static Stream<Arguments> concurrentStoreTest() {
return Stream.concat(
Stream.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.apple.foundationdb.record.test.TestKeySpace;
import com.apple.foundationdb.record.test.TestKeySpacePathManagerExtension;
import com.apple.foundationdb.tuple.Tuple;
import com.google.protobuf.Message;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand All @@ -42,6 +43,7 @@
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;

/**
Expand All @@ -67,6 +69,7 @@ public class LuceneIndexTestDataModel {
* </p>
*/
final Map<Tuple, Map<Tuple, Tuple>> groupingKeyToPrimaryKeyToPartitionKey;
Map<Tuple, Function<Message, Message>> updateableRecords;

private LuceneIndexTestDataModel(final Builder builder) {
random = builder.random;
Expand Down Expand Up @@ -95,6 +98,7 @@ private LuceneIndexTestDataModel(final Builder builder) {
return store;
};
groupingKeyToPrimaryKeyToPartitionKey = new HashMap<>();
updateableRecords = new HashMap<>();
}

@Override
Expand Down Expand Up @@ -128,42 +132,40 @@ public void deleteRecord(final FDBRecordContext context, final Tuple primaryKey)
void saveRecords(int count, long start, FDBRecordContext context, final int group) {
FDBRecordStore recordStore = createOrOpenRecordStore(context);
for (int j = 0; j < count; j++) {
LuceneIndexTestDataModel.saveRecord(isGrouped, isSynthetic, random, groupingKeyToPrimaryKeyToPartitionKey,
LuceneIndexTestDataModel.saveRecord(
isGrouped, isSynthetic, random, groupingKeyToPrimaryKeyToPartitionKey, updateableRecords, true,
textGenerator, start, recordStore, group);
}
}

public Tuple saveEmptyRecord(final boolean isGrouped, final boolean isSynthetic,
final long start, final FDBRecordStore recordStore, final int group) {
return saveEmptyRecord(isGrouped, isSynthetic, random, groupingKeyToPrimaryKeyToPartitionKey, start, recordStore, group);
return saveRecord(isGrouped, isSynthetic, random, groupingKeyToPrimaryKeyToPartitionKey, updateableRecords, false,
null, start, recordStore, group);
}

static Tuple saveEmptyRecord(final boolean isGrouped, final boolean isSynthetic, final Random random,
final Map<Tuple, Map<Tuple, Tuple>> groupingKeyToPrimaryKeyToPartitionKey,
final long start, final FDBRecordStore recordStore, final int group) {
return saveRecord(isGrouped, isSynthetic, random, groupingKeyToPrimaryKeyToPartitionKey, false, null, start, recordStore, group);
}

static Tuple saveRecord(final boolean isGrouped, final boolean isSynthetic, final Random random,
final Map<Tuple, Map<Tuple, Tuple>> groupingKeyToPrimaryKeyToPartitionKey,
final RandomTextGenerator textGenerator, final long start, final FDBRecordStore recordStore,
final int group) {
return saveRecord(isGrouped, isSynthetic, random, groupingKeyToPrimaryKeyToPartitionKey, true, textGenerator, start, recordStore, group);
static void saveRecord(final boolean isGrouped, final boolean isSynthetic, final Random random,
final Map<Tuple, Map<Tuple, Tuple>> groupingKeyToPrimaryKeyToPartitionKey,
final RandomTextGenerator textGenerator, final long start, final FDBRecordStore recordStore,
final int group) {
saveRecord(isGrouped, isSynthetic, random, groupingKeyToPrimaryKeyToPartitionKey, new HashMap<>(), true, textGenerator, start, recordStore, group);
}

public Tuple saveRecord(final boolean isGrouped, final boolean isSynthetic, final long start,
final FDBRecordStore recordStore, final int group) {
return saveRecord(isGrouped, isSynthetic, random, groupingKeyToPrimaryKeyToPartitionKey, true, textGenerator, start, recordStore, group);
return saveRecord(isGrouped, isSynthetic, random, groupingKeyToPrimaryKeyToPartitionKey, updateableRecords,
true, textGenerator, start, recordStore, group);
}

static Tuple saveRecord(final boolean isGrouped, final boolean isSynthetic, final Random random,
final Map<Tuple, Map<Tuple, Tuple>> groupingKeyToPrimaryKeyToPartitionKey,
final boolean withContent, final RandomTextGenerator textGenerator, final long start, final FDBRecordStore recordStore,
final int group) {
private static Tuple saveRecord(final boolean isGrouped, final boolean isSynthetic, final Random random,
final Map<Tuple, Map<Tuple, Tuple>> groupingKeyToPrimaryKeyToPartitionKey,
final Map<Tuple, Function<Message, Message>> updateableRecords,
final boolean withContent, final RandomTextGenerator textGenerator, final long start, final FDBRecordStore recordStore,
final int group) {
final Tuple groupTuple = calculateGroupTuple(isGrouped, group);
final int countInGroup = groupingKeyToPrimaryKeyToPartitionKey.computeIfAbsent(groupTuple, key -> new HashMap<>()).size();
long timestamp = start + countInGroup + random.nextInt(20) - 5;
final Tuple primaryKey = saveRecord(recordStore, isSynthetic, group, countInGroup, timestamp, withContent, textGenerator, random);
final Tuple primaryKey = saveRecord(recordStore, isSynthetic, group, countInGroup, timestamp, withContent, textGenerator, random, updateableRecords);
groupingKeyToPrimaryKeyToPartitionKey.computeIfAbsent(groupTuple, key -> new HashMap<>())
.put(primaryKey, Tuple.from(timestamp).addAll(primaryKey));
return primaryKey;
Expand All @@ -177,7 +179,8 @@ static Tuple saveRecord(final FDBRecordStore recordStore,
final long timestamp,
final boolean withContent,
final @Nullable RandomTextGenerator textGenerator,
final @Nullable Random random) {
final @Nullable Random random,
final Map<Tuple, Function<Message, Message>> updateableRecords) {
var parentBuilder = TestRecordsGroupedParentChildProto.MyParentRecord.newBuilder()
.setGroup(group)
.setRecNo(1001L + countInGroup)
Expand All @@ -191,6 +194,8 @@ static Tuple saveRecord(final FDBRecordStore recordStore,
}
var parent = parentBuilder.build();
Tuple primaryKey;
final Tuple parentPrimaryKey = recordStore.saveRecord(parent).getPrimaryKey();
updateableRecords.put(parentPrimaryKey, existingRecord -> updateParentRecord(existingRecord, random));
if (isSynthetic) {
var childBuilder = TestRecordsGroupedParentChildProto.MyChildRecord.newBuilder()
.setGroup(group)
Expand All @@ -204,15 +209,40 @@ static Tuple saveRecord(final FDBRecordStore recordStore,
final Tuple syntheticRecordTypeKey = recordStore.getRecordMetaData()
.getSyntheticRecordType("JoinChildren")
.getRecordTypeKeyTuple();
final Tuple childPrimaryKey = recordStore.saveRecord(child).getPrimaryKey();
updateableRecords.put(childPrimaryKey, existingRecord -> updateChildRecord(existingRecord, random));
primaryKey = Tuple.from(syntheticRecordTypeKey.getItems().get(0),
recordStore.saveRecord(parent).getPrimaryKey().getItems(),
recordStore.saveRecord(child).getPrimaryKey().getItems());
parentPrimaryKey.getItems(),
childPrimaryKey.getItems());
} else {
primaryKey = recordStore.saveRecord(parent).getPrimaryKey();
primaryKey = parentPrimaryKey;
}
return primaryKey;
}


private static Message updateParentRecord(Message existingRecord, final Random random) {
final var builder = TestRecordsGroupedParentChildProto.MyParentRecord.newBuilder();
builder.mergeFrom(existingRecord);
builder.setIntValue(random.nextInt());
return builder.build();
}

private static Message updateChildRecord(final Message existingRecord, final Random random) {
final var builder = TestRecordsGroupedParentChildProto.MyChildRecord.newBuilder();
builder.mergeFrom(existingRecord);
builder.setOtherValue(random.nextInt());
return builder.build();
}

public CompletableFuture<Void> updateRecord(final FDBRecordStore recordStore,
Tuple primaryKey,
Function<Message, Message> updateMessage) {
return recordStore.loadRecordAsync(primaryKey).thenAccept(existingRecord -> {
recordStore.saveRecord(updateMessage.apply(existingRecord.getRecord()));
});
}

@Nonnull
static Index addIndex(final boolean isSynthetic, final KeyExpression rootExpression,
final Map<String, String> options, final RecordMetaDataBuilder metaDataBuilder) {
Expand Down
Loading

0 comments on commit 93d4b97

Please sign in to comment.