Skip to content

Commit

Permalink
Add a test that does concurrent lucene updates
Browse files Browse the repository at this point in the history
  • Loading branch information
ScottDugas committed Dec 6, 2024
1 parent b9fb711 commit c694659
Show file tree
Hide file tree
Showing 3 changed files with 220 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.IdentityHashMap;
Expand All @@ -86,6 +87,8 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinWorkerThread;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -781,6 +784,85 @@ public void ensureValid() throws IOException {
}
}

@Test
void concurrentUpdate() throws IOException {
// Once the two issues noted below are fixed, we should make this parameterized, and run with additional random
// configurations.
AtomicInteger threadCounter = new AtomicInteger();
// Synchronization blocks in FDBDirectoryWrapper can cause a deadlock
// see https://github.com/FoundationDB/fdb-record-layer/issues/2989
// So set the pool large enough to overcome that
this.dbExtension.getDatabaseFactory().setExecutor(new ForkJoinPool(30,
pool -> {
final ForkJoinWorkerThread thread = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
thread.setName("ConcurrentUpdatePool-" + threadCounter.getAndIncrement());
return thread;
},
null, false));
final long seed = 320947L;
final boolean isGrouped = true;
// updating the parent & child concurrently is not thread safe, we may want to fix the behavior, or say that is
// not supported, as it is the same as updating the same record concurrently, which I don't think is generally
// supported.
final boolean isSynthetic = false;
final boolean primaryKeySegmentIndexEnabled = true;
// LucenePartitioner is not thread safe, and the counts get broken
// See: https://github.com/FoundationDB/fdb-record-layer/issues/2990
final int partitionHighWatermark = -1;
final LuceneIndexTestDataModel dataModel = new LuceneIndexTestDataModel.Builder(seed, this::getStoreBuilder, pathManager)
.setIsGrouped(isGrouped)
.setIsSynthetic(isSynthetic)
.setPrimaryKeySegmentIndexEnabled(primaryKeySegmentIndexEnabled)
.setPartitionHighWatermark(partitionHighWatermark)
.build();

final int repartitionCount = 10;
final int loopCount = 30;

final RecordLayerPropertyStorage contextProps = RecordLayerPropertyStorage.newBuilder()
.addProp(LuceneRecordContextProperties.LUCENE_REPARTITION_DOCUMENT_COUNT, repartitionCount)
.addProp(LuceneRecordContextProperties.LUCENE_MAX_DOCUMENTS_TO_MOVE_DURING_REPARTITIONING, dataModel.nextInt(1000) + repartitionCount)
.addProp(LuceneRecordContextProperties.LUCENE_MERGE_SEGMENTS_PER_TIER, (double)dataModel.nextInt(10) + 2) // it must be at least 2.0
.build();
for (int i = 0; i < loopCount; i++) {
LOGGER.info(KeyValueLogMessage.of("concurrentUpdate loop",
"iteration", i,
"groupCount", dataModel.groupingKeyToPrimaryKeyToPartitionKey.size(),
"docCount", dataModel.groupingKeyToPrimaryKeyToPartitionKey.values().stream().mapToInt(Map::size).sum(),
"docMinPerGroup", dataModel.groupingKeyToPrimaryKeyToPartitionKey.values().stream().mapToInt(Map::size).min(),
"docMaxPerGroup", dataModel.groupingKeyToPrimaryKeyToPartitionKey.values().stream().mapToInt(Map::size).max()));

long start = 234098;
try (FDBRecordContext context = openContext(contextProps)) {
dataModel.saveRecords(10, start, context, 1);
commit(context);
}
explicitMergeIndex(dataModel.index, contextProps, dataModel.schemaSetup);
}


try (FDBRecordContext context = openContext(contextProps)) {
FDBRecordStore recordStore = Objects.requireNonNull(dataModel.schemaSetup.apply(context));
recordStore.getIndexDeferredMaintenanceControl().setAutoMergeDuringCommit(false);
assertThat(dataModel.updateableRecords, Matchers.aMapWithSize(Matchers.greaterThan(30)));
LOGGER.info("concurrentUpdate: Starting updates");
RecordCursor.fromList(new ArrayList<>(dataModel.updateableRecords.entrySet()))
.mapPipelined(entry -> {
return dataModel.updateRecord(recordStore, entry.getKey(), entry.getValue());
}, 10)
.asList().join();
commit(context);
}


final LuceneIndexTestValidator luceneIndexTestValidator = new LuceneIndexTestValidator(() -> openContext(contextProps), context -> Objects.requireNonNull(dataModel.schemaSetup.apply(context)));
luceneIndexTestValidator.validate(dataModel.index, dataModel.groupingKeyToPrimaryKeyToPartitionKey, isSynthetic ? "child_str_value:forth" : "text_value:about");

if (isGrouped) {
validateDeleteWhere(isSynthetic, dataModel.groupingKeyToPrimaryKeyToPartitionKey, contextProps, dataModel.schemaSetup, dataModel.index);
}
}

static Stream<Arguments> concurrentStoreTest() {
return Stream.concat(
Stream.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@
import com.apple.foundationdb.record.test.TestKeySpace;
import com.apple.foundationdb.record.test.TestKeySpacePathManagerExtension;
import com.apple.foundationdb.tuple.Tuple;
import com.google.protobuf.Message;

import javax.annotation.Nonnull;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;

/**
Expand All @@ -64,6 +66,7 @@ public class LuceneIndexTestDataModel {
* </p>
*/
final Map<Tuple, Map<Tuple, Tuple>> groupingKeyToPrimaryKeyToPartitionKey;
Map<Tuple, Function<Message, Message>> updateableRecords;

private LuceneIndexTestDataModel(final Builder builder) {
random = builder.random;
Expand Down Expand Up @@ -92,6 +95,7 @@ private LuceneIndexTestDataModel(final Builder builder) {
return store;
};
groupingKeyToPrimaryKeyToPartitionKey = new HashMap<>();
updateableRecords = new HashMap<>();
}

@Override
Expand Down Expand Up @@ -121,18 +125,25 @@ void saveRecords(int count, long start, FDBRecordContext context, final int grou
FDBRecordStore recordStore = Objects.requireNonNull(schemaSetup.apply(context));
for (int j = 0; j < count; j++) {
LuceneIndexTestDataModel.saveRecord(isGrouped, isSynthetic, random, groupingKeyToPrimaryKeyToPartitionKey,
textGenerator, start, recordStore, group);
textGenerator, start, recordStore, group, updateableRecords);
}
}

static void saveRecord(final boolean isGrouped, final boolean isSynthetic, final Random random,
final Map<Tuple, Map<Tuple, Tuple>> groupingKeyToPrimaryKeyToPartitionKey,
final RandomTextGenerator textGenerator, final long start, final FDBRecordStore recordStore,
final int group) {
saveRecord(isGrouped, isSynthetic, random, groupingKeyToPrimaryKeyToPartitionKey, textGenerator, start, recordStore, group, new HashMap<>());
}

static void saveRecord(final boolean isGrouped, final boolean isSynthetic, final Random random,
final Map<Tuple, Map<Tuple, Tuple>> groupingKeyToPrimaryKeyToPartitionKey,
final RandomTextGenerator textGenerator, final long start, final FDBRecordStore recordStore,
final int group, final Map<Tuple, Function<Message, Message>> updateableRecords) {
final Tuple groupTuple = isGrouped ? Tuple.from(group) : Tuple.from();
final int countInGroup = groupingKeyToPrimaryKeyToPartitionKey.computeIfAbsent(groupTuple, key -> new HashMap<>()).size();
long timestamp = start + countInGroup + random.nextInt(20) - 5;
final Tuple primaryKey = saveRecord(recordStore, isSynthetic, group, countInGroup, timestamp, textGenerator, random);
final Tuple primaryKey = saveRecord(recordStore, isSynthetic, group, countInGroup, timestamp, textGenerator, random, updateableRecords);
groupingKeyToPrimaryKeyToPartitionKey.computeIfAbsent(groupTuple, key -> new HashMap<>())
.put(primaryKey, Tuple.from(timestamp).addAll(primaryKey));
}
Expand All @@ -144,7 +155,8 @@ static Tuple saveRecord(final FDBRecordStore recordStore,
final int countInGroup,
final long timestamp,
final RandomTextGenerator textGenerator,
final Random random) {
final Random random,
final Map<Tuple, Function<Message, Message>> updateableRecords) {
var parent = TestRecordsGroupedParentChildProto.MyParentRecord.newBuilder()
.setGroup(group)
.setRecNo(1001L + countInGroup)
Expand All @@ -155,6 +167,8 @@ static Tuple saveRecord(final FDBRecordStore recordStore,
.setChildRecNo(1000L - countInGroup)
.build();
Tuple primaryKey;
final Tuple parentPrimaryKey = recordStore.saveRecord(parent).getPrimaryKey();
updateableRecords.put(parentPrimaryKey, existingRecord -> updateParentRecord(existingRecord, random));
if (isSynthetic) {
var child = TestRecordsGroupedParentChildProto.MyChildRecord.newBuilder()
.setGroup(group)
Expand All @@ -165,15 +179,40 @@ static Tuple saveRecord(final FDBRecordStore recordStore,
final Tuple syntheticRecordTypeKey = recordStore.getRecordMetaData()
.getSyntheticRecordType("JoinChildren")
.getRecordTypeKeyTuple();
final Tuple childPrimaryKey = recordStore.saveRecord(child).getPrimaryKey();
updateableRecords.put(childPrimaryKey, existingRecord -> updateChildRecord(existingRecord, random));
primaryKey = Tuple.from(syntheticRecordTypeKey.getItems().get(0),
recordStore.saveRecord(parent).getPrimaryKey().getItems(),
recordStore.saveRecord(child).getPrimaryKey().getItems());
parentPrimaryKey.getItems(),
childPrimaryKey.getItems());
} else {
primaryKey = recordStore.saveRecord(parent).getPrimaryKey();
primaryKey = parentPrimaryKey;
}
return primaryKey;
}


private static Message updateParentRecord(Message existingRecord, final Random random) {
final var builder = TestRecordsGroupedParentChildProto.MyParentRecord.newBuilder();
builder.mergeFrom(existingRecord);
builder.setIntValue(random.nextInt());
return builder.build();
}

private static Message updateChildRecord(final Message existingRecord, final Random random) {
final var builder = TestRecordsGroupedParentChildProto.MyChildRecord.newBuilder();
builder.mergeFrom(existingRecord);
builder.setOtherValue(random.nextInt());
return builder.build();
}

public CompletableFuture<Void> updateRecord(final FDBRecordStore recordStore,
Tuple primaryKey,
Function<Message, Message> updateMessage) {
return recordStore.loadRecordAsync(primaryKey).thenAccept(existingRecord -> {
recordStore.saveRecord(updateMessage.apply(existingRecord.getRecord()));
});
}

@Nonnull
static Index addIndex(final boolean isSynthetic, final KeyExpression rootExpression,
final Map<String, String> options, final RecordMetaDataBuilder metaDataBuilder) {
Expand Down
Loading

0 comments on commit c694659

Please sign in to comment.