Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a test that does concurrent lucene updates #2996

Merged
merged 3 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.IdentityHashMap;
Expand All @@ -86,6 +87,8 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinWorkerThread;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -781,6 +784,85 @@ public void ensureValid() throws IOException {
}
}

@Test
void concurrentUpdate() throws IOException {
ohadzeliger marked this conversation as resolved.
Show resolved Hide resolved
// Once the two issues noted below are fixed, we should make this parameterized, and run with additional random
// configurations.
AtomicInteger threadCounter = new AtomicInteger();
// Synchronization blocks in FDBDirectoryWrapper can cause a deadlock
// see https://github.com/FoundationDB/fdb-record-layer/issues/2989
// So set the pool large enough to overcome that
this.dbExtension.getDatabaseFactory().setExecutor(new ForkJoinPool(30,
ohadzeliger marked this conversation as resolved.
Show resolved Hide resolved
pool -> {
final ForkJoinWorkerThread thread = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
thread.setName("ConcurrentUpdatePool-" + threadCounter.getAndIncrement());
return thread;
},
null, false));
final long seed = 320947L;
final boolean isGrouped = true;
// updating the parent & child concurrently is not thread safe, we may want to fix the behavior, or say that is
// not supported, as it is the same as updating the same record concurrently, which I don't think is generally
// supported.
final boolean isSynthetic = false;
final boolean primaryKeySegmentIndexEnabled = true;
// LucenePartitioner is not thread safe, and the counts get broken
// See: https://github.com/FoundationDB/fdb-record-layer/issues/2990
final int partitionHighWatermark = -1;
final LuceneIndexTestDataModel dataModel = new LuceneIndexTestDataModel.Builder(seed, this::getStoreBuilder, pathManager)
.setIsGrouped(isGrouped)
.setIsSynthetic(isSynthetic)
.setPrimaryKeySegmentIndexEnabled(primaryKeySegmentIndexEnabled)
.setPartitionHighWatermark(partitionHighWatermark)
.build();

final int repartitionCount = 10;
final int loopCount = 30;

final RecordLayerPropertyStorage contextProps = RecordLayerPropertyStorage.newBuilder()
.addProp(LuceneRecordContextProperties.LUCENE_REPARTITION_DOCUMENT_COUNT, repartitionCount)
.addProp(LuceneRecordContextProperties.LUCENE_MAX_DOCUMENTS_TO_MOVE_DURING_REPARTITIONING, dataModel.nextInt(1000) + repartitionCount)
.addProp(LuceneRecordContextProperties.LUCENE_MERGE_SEGMENTS_PER_TIER, (double)dataModel.nextInt(10) + 2) // it must be at least 2.0
.build();
for (int i = 0; i < loopCount; i++) {
LOGGER.info(KeyValueLogMessage.of("concurrentUpdate loop",
"iteration", i,
"groupCount", dataModel.groupingKeyToPrimaryKeyToPartitionKey.size(),
"docCount", dataModel.groupingKeyToPrimaryKeyToPartitionKey.values().stream().mapToInt(Map::size).sum(),
"docMinPerGroup", dataModel.groupingKeyToPrimaryKeyToPartitionKey.values().stream().mapToInt(Map::size).min(),
"docMaxPerGroup", dataModel.groupingKeyToPrimaryKeyToPartitionKey.values().stream().mapToInt(Map::size).max()));

long start = 234098;
ohadzeliger marked this conversation as resolved.
Show resolved Hide resolved
try (FDBRecordContext context = openContext(contextProps)) {
dataModel.saveRecords(10, start, context, 1);
commit(context);
}
explicitMergeIndex(dataModel.index, contextProps, dataModel.schemaSetup);
}


try (FDBRecordContext context = openContext(contextProps)) {
FDBRecordStore recordStore = Objects.requireNonNull(dataModel.schemaSetup.apply(context));
recordStore.getIndexDeferredMaintenanceControl().setAutoMergeDuringCommit(false);
assertThat(dataModel.updateableRecords, Matchers.aMapWithSize(Matchers.greaterThan(30)));
LOGGER.info("concurrentUpdate: Starting updates");
RecordCursor.fromList(new ArrayList<>(dataModel.updateableRecords.entrySet()))
.mapPipelined(entry -> {
return dataModel.updateRecord(recordStore, entry.getKey(), entry.getValue());
}, 10)
.asList().join();
commit(context);
}


final LuceneIndexTestValidator luceneIndexTestValidator = new LuceneIndexTestValidator(() -> openContext(contextProps), context -> Objects.requireNonNull(dataModel.schemaSetup.apply(context)));
luceneIndexTestValidator.validate(dataModel.index, dataModel.groupingKeyToPrimaryKeyToPartitionKey, isSynthetic ? "child_str_value:forth" : "text_value:about");

if (isGrouped) {
validateDeleteWhere(isSynthetic, dataModel.groupingKeyToPrimaryKeyToPartitionKey, contextProps, dataModel.schemaSetup, dataModel.index);
}
}

static Stream<Arguments> concurrentStoreTest() {
return Stream.concat(
Stream.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@
import com.apple.foundationdb.record.test.TestKeySpace;
import com.apple.foundationdb.record.test.TestKeySpacePathManagerExtension;
import com.apple.foundationdb.tuple.Tuple;
import com.google.protobuf.Message;

import javax.annotation.Nonnull;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;

/**
Expand All @@ -64,6 +66,7 @@ public class LuceneIndexTestDataModel {
* </p>
*/
final Map<Tuple, Map<Tuple, Tuple>> groupingKeyToPrimaryKeyToPartitionKey;
Map<Tuple, Function<Message, Message>> updateableRecords;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems very specific and limited - won't it make sense to store the records as "saved records" and handle the update-specific logic in the test class rather than at the model?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may be hyper-focused on the maintenance tests which have less interesting record shapes, but I feel like, if we want to pull this out, I would also want to abstract away the types a little bit, so that the code above doesn't have to worry about whether it is synthetic or not.
Or perhaps, that the Function here should be replaced with an object, that has a variety of operations.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the sake of making progress, then, would you like to check this in as-is and postpone these improvements to a later pr?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed this to be an object in #3003 but I think there is definitely room to grow, and polish there.


private LuceneIndexTestDataModel(final Builder builder) {
random = builder.random;
Expand Down Expand Up @@ -92,6 +95,7 @@ private LuceneIndexTestDataModel(final Builder builder) {
return store;
};
groupingKeyToPrimaryKeyToPartitionKey = new HashMap<>();
updateableRecords = new HashMap<>();
}

@Override
Expand Down Expand Up @@ -121,18 +125,25 @@ void saveRecords(int count, long start, FDBRecordContext context, final int grou
FDBRecordStore recordStore = Objects.requireNonNull(schemaSetup.apply(context));
for (int j = 0; j < count; j++) {
LuceneIndexTestDataModel.saveRecord(isGrouped, isSynthetic, random, groupingKeyToPrimaryKeyToPartitionKey,
textGenerator, start, recordStore, group);
textGenerator, start, recordStore, group, updateableRecords);
}
}

static void saveRecord(final boolean isGrouped, final boolean isSynthetic, final Random random,
final Map<Tuple, Map<Tuple, Tuple>> groupingKeyToPrimaryKeyToPartitionKey,
final RandomTextGenerator textGenerator, final long start, final FDBRecordStore recordStore,
final int group) {
saveRecord(isGrouped, isSynthetic, random, groupingKeyToPrimaryKeyToPartitionKey, textGenerator, start, recordStore, group, new HashMap<>());
}

static void saveRecord(final boolean isGrouped, final boolean isSynthetic, final Random random,
final Map<Tuple, Map<Tuple, Tuple>> groupingKeyToPrimaryKeyToPartitionKey,
final RandomTextGenerator textGenerator, final long start, final FDBRecordStore recordStore,
final int group, final Map<Tuple, Function<Message, Message>> updateableRecords) {
final Tuple groupTuple = isGrouped ? Tuple.from(group) : Tuple.from();
final int countInGroup = groupingKeyToPrimaryKeyToPartitionKey.computeIfAbsent(groupTuple, key -> new HashMap<>()).size();
long timestamp = start + countInGroup + random.nextInt(20) - 5;
final Tuple primaryKey = saveRecord(recordStore, isSynthetic, group, countInGroup, timestamp, textGenerator, random);
final Tuple primaryKey = saveRecord(recordStore, isSynthetic, group, countInGroup, timestamp, textGenerator, random, updateableRecords);
groupingKeyToPrimaryKeyToPartitionKey.computeIfAbsent(groupTuple, key -> new HashMap<>())
.put(primaryKey, Tuple.from(timestamp).addAll(primaryKey));
}
Expand All @@ -144,7 +155,8 @@ static Tuple saveRecord(final FDBRecordStore recordStore,
final int countInGroup,
final long timestamp,
final RandomTextGenerator textGenerator,
final Random random) {
final Random random,
final Map<Tuple, Function<Message, Message>> updateableRecords) {
var parent = TestRecordsGroupedParentChildProto.MyParentRecord.newBuilder()
.setGroup(group)
.setRecNo(1001L + countInGroup)
Expand All @@ -155,6 +167,8 @@ static Tuple saveRecord(final FDBRecordStore recordStore,
.setChildRecNo(1000L - countInGroup)
.build();
Tuple primaryKey;
final Tuple parentPrimaryKey = recordStore.saveRecord(parent).getPrimaryKey();
updateableRecords.put(parentPrimaryKey, existingRecord -> updateParentRecord(existingRecord, random));
if (isSynthetic) {
var child = TestRecordsGroupedParentChildProto.MyChildRecord.newBuilder()
.setGroup(group)
Expand All @@ -165,15 +179,40 @@ static Tuple saveRecord(final FDBRecordStore recordStore,
final Tuple syntheticRecordTypeKey = recordStore.getRecordMetaData()
.getSyntheticRecordType("JoinChildren")
.getRecordTypeKeyTuple();
final Tuple childPrimaryKey = recordStore.saveRecord(child).getPrimaryKey();
updateableRecords.put(childPrimaryKey, existingRecord -> updateChildRecord(existingRecord, random));
primaryKey = Tuple.from(syntheticRecordTypeKey.getItems().get(0),
recordStore.saveRecord(parent).getPrimaryKey().getItems(),
recordStore.saveRecord(child).getPrimaryKey().getItems());
parentPrimaryKey.getItems(),
childPrimaryKey.getItems());
} else {
primaryKey = recordStore.saveRecord(parent).getPrimaryKey();
primaryKey = parentPrimaryKey;
}
return primaryKey;
}


private static Message updateParentRecord(Message existingRecord, final Random random) {
final var builder = TestRecordsGroupedParentChildProto.MyParentRecord.newBuilder();
builder.mergeFrom(existingRecord);
builder.setIntValue(random.nextInt());
return builder.build();
}

private static Message updateChildRecord(final Message existingRecord, final Random random) {
final var builder = TestRecordsGroupedParentChildProto.MyChildRecord.newBuilder();
builder.mergeFrom(existingRecord);
builder.setOtherValue(random.nextInt());
return builder.build();
}

public CompletableFuture<Void> updateRecord(final FDBRecordStore recordStore,
Tuple primaryKey,
Function<Message, Message> updateMessage) {
return recordStore.loadRecordAsync(primaryKey).thenAccept(existingRecord -> {
recordStore.saveRecord(updateMessage.apply(existingRecord.getRecord()));
});
}

@Nonnull
static Index addIndex(final boolean isSynthetic, final KeyExpression rootExpression,
final Map<String, String> options, final RecordMetaDataBuilder metaDataBuilder) {
Expand Down
Loading