From c694659d9403f19fd7fa96c0ef0cd45567e44b08 Mon Sep 17 00:00:00 2001 From: Scott Dugas Date: Thu, 5 Dec 2024 16:45:28 -0500 Subject: [PATCH] Add a test that does concurrent lucene updates --- .../lucene/LuceneIndexMaintenanceTest.java | 82 +++++++++ .../lucene/LuceneIndexTestDataModel.java | 51 +++++- .../lucene/LuceneIndexTestValidator.java | 156 +++++++++++------- 3 files changed, 220 insertions(+), 69 deletions(-) diff --git a/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintenanceTest.java b/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintenanceTest.java index ebf8dc8d32..9bb8dc845d 100644 --- a/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintenanceTest.java +++ b/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintenanceTest.java @@ -75,6 +75,7 @@ import java.time.Duration; import java.time.Instant; import java.util.ArrayDeque; +import java.util.ArrayList; import java.util.Comparator; import java.util.HashMap; import java.util.IdentityHashMap; @@ -86,6 +87,8 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ForkJoinWorkerThread; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -781,6 +784,85 @@ public void ensureValid() throws IOException { } } + @Test + void concurrentUpdate() throws IOException { + // Once the two issues noted below are fixed, we should make this parameterized, and run with additional random + // configurations. + AtomicInteger threadCounter = new AtomicInteger(); + // Synchronization blocks in FDBDirectoryWrapper can cause a deadlock + // see https://github.com/FoundationDB/fdb-record-layer/issues/2989 + // So set the pool large enough to overcome that + this.dbExtension.getDatabaseFactory().setExecutor(new ForkJoinPool(30, + pool -> { + final ForkJoinWorkerThread thread = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool); + thread.setName("ConcurrentUpdatePool-" + threadCounter.getAndIncrement()); + return thread; + }, + null, false)); + final long seed = 320947L; + final boolean isGrouped = true; + // updating the parent & child concurrently is not thread safe, we may want to fix the behavior, or say that is + // not supported, as it is the same as updating the same record concurrently, which I don't think is generally + // supported. + final boolean isSynthetic = false; + final boolean primaryKeySegmentIndexEnabled = true; + // LucenePartitioner is not thread safe, and the counts get broken + // See: https://github.com/FoundationDB/fdb-record-layer/issues/2990 + final int partitionHighWatermark = -1; + final LuceneIndexTestDataModel dataModel = new LuceneIndexTestDataModel.Builder(seed, this::getStoreBuilder, pathManager) + .setIsGrouped(isGrouped) + .setIsSynthetic(isSynthetic) + .setPrimaryKeySegmentIndexEnabled(primaryKeySegmentIndexEnabled) + .setPartitionHighWatermark(partitionHighWatermark) + .build(); + + final int repartitionCount = 10; + final int loopCount = 30; + + final RecordLayerPropertyStorage contextProps = RecordLayerPropertyStorage.newBuilder() + .addProp(LuceneRecordContextProperties.LUCENE_REPARTITION_DOCUMENT_COUNT, repartitionCount) + .addProp(LuceneRecordContextProperties.LUCENE_MAX_DOCUMENTS_TO_MOVE_DURING_REPARTITIONING, dataModel.nextInt(1000) + repartitionCount) + .addProp(LuceneRecordContextProperties.LUCENE_MERGE_SEGMENTS_PER_TIER, (double)dataModel.nextInt(10) + 2) // it must be at least 2.0 + .build(); + for (int i = 0; i < loopCount; i++) { + LOGGER.info(KeyValueLogMessage.of("concurrentUpdate loop", + "iteration", i, + "groupCount", dataModel.groupingKeyToPrimaryKeyToPartitionKey.size(), + "docCount", dataModel.groupingKeyToPrimaryKeyToPartitionKey.values().stream().mapToInt(Map::size).sum(), + "docMinPerGroup", dataModel.groupingKeyToPrimaryKeyToPartitionKey.values().stream().mapToInt(Map::size).min(), + "docMaxPerGroup", dataModel.groupingKeyToPrimaryKeyToPartitionKey.values().stream().mapToInt(Map::size).max())); + + long start = 234098; + try (FDBRecordContext context = openContext(contextProps)) { + dataModel.saveRecords(10, start, context, 1); + commit(context); + } + explicitMergeIndex(dataModel.index, contextProps, dataModel.schemaSetup); + } + + + try (FDBRecordContext context = openContext(contextProps)) { + FDBRecordStore recordStore = Objects.requireNonNull(dataModel.schemaSetup.apply(context)); + recordStore.getIndexDeferredMaintenanceControl().setAutoMergeDuringCommit(false); + assertThat(dataModel.updateableRecords, Matchers.aMapWithSize(Matchers.greaterThan(30))); + LOGGER.info("concurrentUpdate: Starting updates"); + RecordCursor.fromList(new ArrayList<>(dataModel.updateableRecords.entrySet())) + .mapPipelined(entry -> { + return dataModel.updateRecord(recordStore, entry.getKey(), entry.getValue()); + }, 10) + .asList().join(); + commit(context); + } + + + final LuceneIndexTestValidator luceneIndexTestValidator = new LuceneIndexTestValidator(() -> openContext(contextProps), context -> Objects.requireNonNull(dataModel.schemaSetup.apply(context))); + luceneIndexTestValidator.validate(dataModel.index, dataModel.groupingKeyToPrimaryKeyToPartitionKey, isSynthetic ? "child_str_value:forth" : "text_value:about"); + + if (isGrouped) { + validateDeleteWhere(isSynthetic, dataModel.groupingKeyToPrimaryKeyToPartitionKey, contextProps, dataModel.schemaSetup, dataModel.index); + } + } + static Stream concurrentStoreTest() { return Stream.concat( Stream.of( diff --git a/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexTestDataModel.java b/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexTestDataModel.java index 58e12ba3cb..8724c8f913 100644 --- a/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexTestDataModel.java +++ b/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexTestDataModel.java @@ -34,6 +34,7 @@ import com.apple.foundationdb.record.test.TestKeySpace; import com.apple.foundationdb.record.test.TestKeySpacePathManagerExtension; import com.apple.foundationdb.tuple.Tuple; +import com.google.protobuf.Message; import javax.annotation.Nonnull; import java.util.HashMap; @@ -41,6 +42,7 @@ import java.util.Objects; import java.util.Random; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.function.Function; /** @@ -64,6 +66,7 @@ public class LuceneIndexTestDataModel { *

*/ final Map> groupingKeyToPrimaryKeyToPartitionKey; + Map> updateableRecords; private LuceneIndexTestDataModel(final Builder builder) { random = builder.random; @@ -92,6 +95,7 @@ private LuceneIndexTestDataModel(final Builder builder) { return store; }; groupingKeyToPrimaryKeyToPartitionKey = new HashMap<>(); + updateableRecords = new HashMap<>(); } @Override @@ -121,7 +125,7 @@ void saveRecords(int count, long start, FDBRecordContext context, final int grou FDBRecordStore recordStore = Objects.requireNonNull(schemaSetup.apply(context)); for (int j = 0; j < count; j++) { LuceneIndexTestDataModel.saveRecord(isGrouped, isSynthetic, random, groupingKeyToPrimaryKeyToPartitionKey, - textGenerator, start, recordStore, group); + textGenerator, start, recordStore, group, updateableRecords); } } @@ -129,10 +133,17 @@ static void saveRecord(final boolean isGrouped, final boolean isSynthetic, final final Map> groupingKeyToPrimaryKeyToPartitionKey, final RandomTextGenerator textGenerator, final long start, final FDBRecordStore recordStore, final int group) { + saveRecord(isGrouped, isSynthetic, random, groupingKeyToPrimaryKeyToPartitionKey, textGenerator, start, recordStore, group, new HashMap<>()); + } + + static void saveRecord(final boolean isGrouped, final boolean isSynthetic, final Random random, + final Map> groupingKeyToPrimaryKeyToPartitionKey, + final RandomTextGenerator textGenerator, final long start, final FDBRecordStore recordStore, + final int group, final Map> updateableRecords) { final Tuple groupTuple = isGrouped ? Tuple.from(group) : Tuple.from(); final int countInGroup = groupingKeyToPrimaryKeyToPartitionKey.computeIfAbsent(groupTuple, key -> new HashMap<>()).size(); long timestamp = start + countInGroup + random.nextInt(20) - 5; - final Tuple primaryKey = saveRecord(recordStore, isSynthetic, group, countInGroup, timestamp, textGenerator, random); + final Tuple primaryKey = saveRecord(recordStore, isSynthetic, group, countInGroup, timestamp, textGenerator, random, updateableRecords); groupingKeyToPrimaryKeyToPartitionKey.computeIfAbsent(groupTuple, key -> new HashMap<>()) .put(primaryKey, Tuple.from(timestamp).addAll(primaryKey)); } @@ -144,7 +155,8 @@ static Tuple saveRecord(final FDBRecordStore recordStore, final int countInGroup, final long timestamp, final RandomTextGenerator textGenerator, - final Random random) { + final Random random, + final Map> updateableRecords) { var parent = TestRecordsGroupedParentChildProto.MyParentRecord.newBuilder() .setGroup(group) .setRecNo(1001L + countInGroup) @@ -155,6 +167,8 @@ static Tuple saveRecord(final FDBRecordStore recordStore, .setChildRecNo(1000L - countInGroup) .build(); Tuple primaryKey; + final Tuple parentPrimaryKey = recordStore.saveRecord(parent).getPrimaryKey(); + updateableRecords.put(parentPrimaryKey, existingRecord -> updateParentRecord(existingRecord, random)); if (isSynthetic) { var child = TestRecordsGroupedParentChildProto.MyChildRecord.newBuilder() .setGroup(group) @@ -165,15 +179,40 @@ static Tuple saveRecord(final FDBRecordStore recordStore, final Tuple syntheticRecordTypeKey = recordStore.getRecordMetaData() .getSyntheticRecordType("JoinChildren") .getRecordTypeKeyTuple(); + final Tuple childPrimaryKey = recordStore.saveRecord(child).getPrimaryKey(); + updateableRecords.put(childPrimaryKey, existingRecord -> updateChildRecord(existingRecord, random)); primaryKey = Tuple.from(syntheticRecordTypeKey.getItems().get(0), - recordStore.saveRecord(parent).getPrimaryKey().getItems(), - recordStore.saveRecord(child).getPrimaryKey().getItems()); + parentPrimaryKey.getItems(), + childPrimaryKey.getItems()); } else { - primaryKey = recordStore.saveRecord(parent).getPrimaryKey(); + primaryKey = parentPrimaryKey; } return primaryKey; } + + private static Message updateParentRecord(Message existingRecord, final Random random) { + final var builder = TestRecordsGroupedParentChildProto.MyParentRecord.newBuilder(); + builder.mergeFrom(existingRecord); + builder.setIntValue(random.nextInt()); + return builder.build(); + } + + private static Message updateChildRecord(final Message existingRecord, final Random random) { + final var builder = TestRecordsGroupedParentChildProto.MyChildRecord.newBuilder(); + builder.mergeFrom(existingRecord); + builder.setOtherValue(random.nextInt()); + return builder.build(); + } + + public CompletableFuture updateRecord(final FDBRecordStore recordStore, + Tuple primaryKey, + Function updateMessage) { + return recordStore.loadRecordAsync(primaryKey).thenAccept(existingRecord -> { + recordStore.saveRecord(updateMessage.apply(existingRecord.getRecord())); + }); + } + @Nonnull static Index addIndex(final boolean isSynthetic, final KeyExpression rootExpression, final Map options, final RecordMetaDataBuilder metaDataBuilder) { diff --git a/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexTestValidator.java b/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexTestValidator.java index d72aabb3c3..b6ba3bf270 100644 --- a/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexTestValidator.java +++ b/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexTestValidator.java @@ -59,6 +59,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -112,11 +113,8 @@ void validate(Index index, final Map> expectedDocumentI */ void validate(Index index, final Map> expectedDocumentInformation, final String universalSearch, final boolean allowDuplicatePrimaryKeys) throws IOException { - final int partitionHighWatermark = Integer.parseInt(index.getOption(LuceneIndexOptions.INDEX_PARTITION_HIGH_WATERMARK)); - String partitionLowWaterMarkStr = index.getOption(LuceneIndexOptions.INDEX_PARTITION_LOW_WATERMARK); - final int partitionLowWatermark = partitionLowWaterMarkStr == null ? - Math.max(LucenePartitioner.DEFAULT_PARTITION_LOW_WATERMARK, 1) : - Integer.parseInt(partitionLowWaterMarkStr); + final int partitionHighWatermark = getPartitionHighWatermark(index); + final int partitionLowWatermark = getPartitionLowWatermark(index); Map> missingDocuments = new HashMap<>(); expectedDocumentInformation.forEach((groupingKey, groupedIds) -> { @@ -128,71 +126,102 @@ void validate(Index index, final Map> expectedDocumentI "group", groupingKey, "expectedCount", entry.getValue().size())); - final Set> entries = entry.getValue().entrySet(); - final List records = entries.stream() - // I think in theory, this should be able to be: - // Map.Entry.comparingByValue().thenComparing(Map.Entry.comparingByKey()) - // but I could not get the types to work - .sorted((c1, c2) -> { - final int valueComparison = c1.getValue().compareTo(c2.getValue()); - if (valueComparison != 0) { - return valueComparison; - } else { - return c1.getKey().compareTo(c2.getKey()); - } - }) + final List records = entry.getValue().entrySet().stream() + .sorted(Map.Entry.comparingByValue().thenComparing(Map.Entry.comparingByKey())) .map(Map.Entry::getKey) .collect(Collectors.toList()); - List partitionInfos = getPartitionMeta(index, groupingKey); - partitionInfos.sort(Comparator.comparing(info -> Tuple.fromBytes(info.getFrom().toByteArray()))); - final String allCounts = partitionInfos.stream() - .map(info -> Tuple.fromBytes(info.getFrom().toByteArray()).toString() + info.getCount()) - .collect(Collectors.joining(",", "[", "]")); - Set usedPartitionIds = new HashSet<>(); - Tuple lastToTuple = null; - int visitedCount = 0; + if (partitionHighWatermark > 0) { + validatePartitionedGroup(index, universalSearch, allowDuplicatePrimaryKeys, groupingKey, partitionLowWatermark, partitionHighWatermark, records, missingDocuments); + } else { + validateUnpartitionedGroup(index, universalSearch, allowDuplicatePrimaryKeys, groupingKey, partitionLowWatermark, partitionHighWatermark, records, missingDocuments); + } + } + missingDocuments.entrySet().removeIf(entry -> entry.getValue().isEmpty()); + assertEquals(Map.of(), missingDocuments, "We should have found all documents in the index"); + } - try (FDBRecordContext context = contextProvider.get()) { - final FDBRecordStore recordStore = schemaSetup.apply(context); + private void validateUnpartitionedGroup(final Index index, final String universalSearch, final boolean allowDuplicatePrimaryKeys, final Tuple groupingKey, final int partitionLowWatermark, final int partitionHighWatermark, final List records, final Map> missingDocuments) throws IOException { + try (FDBRecordContext context = contextProvider.get()) { + final FDBRecordStore recordStore = schemaSetup.apply(context); + LOGGER.debug(KeyValueLogMessage.of("Visiting group", + "group", groupingKey, + "documentsInGroup", records.size())); + validateDocsInPartition(recordStore, index, null, groupingKey, Set.copyOf(records), universalSearch); + validatePrimaryKeySegmentIndex(recordStore, index, groupingKey, null, + Set.copyOf(records), allowDuplicatePrimaryKeys); + Set.copyOf(records).forEach(primaryKey -> missingDocuments.get(groupingKey).remove(primaryKey)); + } + } - for (int i = 0; i < partitionInfos.size(); i++) { - final LucenePartitionInfoProto.LucenePartitionInfo partitionInfo = partitionInfos.get(i); - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("Group: " + groupingKey + " PartitionInfo[" + partitionInfo.getId() + - "]: count:" + partitionInfo.getCount() + " " + - Tuple.fromBytes(partitionInfo.getFrom().toByteArray()) + "-> " + - Tuple.fromBytes(partitionInfo.getTo().toByteArray())); - } + private void validatePartitionedGroup(final Index index, final String universalSearch, final boolean allowDuplicatePrimaryKeys, final Tuple groupingKey, final int partitionLowWatermark, final int partitionHighWatermark, final List records, final Map> missingDocuments) throws IOException { + List partitionInfos = getPartitionMeta(index, groupingKey); + partitionInfos.sort(Comparator.comparing(info -> Tuple.fromBytes(info.getFrom().toByteArray()))); + final String allCounts = partitionInfos.stream() + .map(info -> Tuple.fromBytes(info.getFrom().toByteArray()).toString() + info.getCount()) + .collect(Collectors.joining(",", "[", "]")); + Set usedPartitionIds = new HashSet<>(); + Tuple lastToTuple = null; + int visitedCount = 0; - assertTrue(isParititionCountWithinBounds(partitionInfos, i, partitionLowWatermark, partitionHighWatermark), - "Group: " + groupingKey + " - " + allCounts + "\nlowWatermark: " + partitionLowWatermark + ", highWatermark: " + partitionHighWatermark + - "\nCurrent count: " + partitionInfo.getCount()); - assertTrue(usedPartitionIds.add(partitionInfo.getId()), () -> "Duplicate id: " + partitionInfo); - final Tuple fromTuple = Tuple.fromBytes(partitionInfo.getFrom().toByteArray()); - if (i > 0) { - assertThat(fromTuple, greaterThan(lastToTuple)); - } - lastToTuple = Tuple.fromBytes(partitionInfo.getTo().toByteArray()); - assertThat(fromTuple, lessThanOrEqualTo(lastToTuple)); + try (FDBRecordContext context = contextProvider.get()) { + final FDBRecordStore recordStore = schemaSetup.apply(context); + for (int i = 0; i < partitionInfos.size(); i++) { + final LucenePartitionInfoProto.LucenePartitionInfo partitionInfo = partitionInfos.get(i); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Group: " + groupingKey + " PartitionInfo[" + partitionInfo.getId() + + "]: count:" + partitionInfo.getCount() + " " + + Tuple.fromBytes(partitionInfo.getFrom().toByteArray()) + "-> " + + Tuple.fromBytes(partitionInfo.getTo().toByteArray())); + } - LOGGER.debug(KeyValueLogMessage.of("Visited partition", - "group", groupingKey, - "documentsSoFar", visitedCount, - "documentsInGroup", records.size(), - "partitionInfo.count", partitionInfo.getCount())); - final Set expectedPrimaryKeys = Set.copyOf(records.subList(visitedCount, visitedCount + partitionInfo.getCount())); - validateDocsInPartition(recordStore, index, partitionInfo.getId(), groupingKey, - expectedPrimaryKeys, - universalSearch); - visitedCount += partitionInfo.getCount(); - validatePrimaryKeySegmentIndex(recordStore, index, groupingKey, partitionInfo.getId(), - expectedPrimaryKeys, allowDuplicatePrimaryKeys); - expectedPrimaryKeys.forEach(primaryKey -> missingDocuments.get(groupingKey).remove(primaryKey)); + assertTrue(isParititionCountWithinBounds(partitionInfos, i, partitionLowWatermark, partitionHighWatermark), + "Group: " + groupingKey + " - " + allCounts + "\nlowWatermark: " + partitionLowWatermark + ", highWatermark: " + partitionHighWatermark + + "\nCurrent count: " + partitionInfo.getCount()); + assertTrue(usedPartitionIds.add(partitionInfo.getId()), () -> "Duplicate id: " + partitionInfo); + final Tuple fromTuple = Tuple.fromBytes(partitionInfo.getFrom().toByteArray()); + if (i > 0) { + assertThat(fromTuple, greaterThan(lastToTuple)); } + lastToTuple = Tuple.fromBytes(partitionInfo.getTo().toByteArray()); + assertThat(fromTuple, lessThanOrEqualTo(lastToTuple)); + + LOGGER.debug(KeyValueLogMessage.of("Visited partition", + "group", groupingKey, + "documentsSoFar", visitedCount, + "documentsInGroup", records.size(), + "partitionInfo.count", partitionInfo.getCount())); + // if partitionInfo.getCount() is wrong, this can be very confusing, so a different assertion might be + // worthwhile + final Set expectedPrimaryKeys = Set.copyOf(records.subList(visitedCount, + visitedCount + partitionInfo.getCount())); + validateDocsInPartition(recordStore, index, partitionInfo.getId(), groupingKey, + expectedPrimaryKeys, + universalSearch); + visitedCount += partitionInfo.getCount(); + assertThat(records.size(), greaterThanOrEqualTo(visitedCount)); + validatePrimaryKeySegmentIndex(recordStore, index, groupingKey, partitionInfo.getId(), + expectedPrimaryKeys, allowDuplicatePrimaryKeys); + expectedPrimaryKeys.forEach(primaryKey -> missingDocuments.get(groupingKey).remove(primaryKey)); } } - missingDocuments.entrySet().removeIf(entry -> entry.getValue().isEmpty()); - assertEquals(Map.of(), missingDocuments, "We should have found all documents in the index"); + } + + private static int getPartitionLowWatermark(final Index index) { + String partitionLowWaterMarkStr = index.getOption(LuceneIndexOptions.INDEX_PARTITION_LOW_WATERMARK); + if (partitionLowWaterMarkStr == null) { + return Math.max(LucenePartitioner.DEFAULT_PARTITION_LOW_WATERMARK, 1); + } else { + return Integer.parseInt(partitionLowWaterMarkStr); + } + } + + private static int getPartitionHighWatermark(final Index index) { + final String option = index.getOption(LuceneIndexOptions.INDEX_PARTITION_HIGH_WATERMARK); + if (option == null) { + return -1; + } else { + return Integer.parseInt(option); + } } List getPartitionMeta(Index index, @@ -226,7 +255,8 @@ int getPartitionExtraCapacity(int count, int highWatermark) { return Math.max(0, highWatermark - count); } - public static void validateDocsInPartition(final FDBRecordStore recordStore, Index index, int partitionId, Tuple groupingKey, + public static void validateDocsInPartition(final FDBRecordStore recordStore, Index index, + @Nullable Integer partitionId, Tuple groupingKey, Set expectedPrimaryKeys, final String universalSearch) throws IOException { LuceneScanQuery scanQuery; if (groupingKey.isEmpty()) { @@ -262,7 +292,7 @@ public static void validateDocsInPartition(final FDBRecordStore recordStore, Ind } public static IndexReader getIndexReader(final FDBRecordStore recordStore, final Index index, - final Tuple groupingKey, final int partitionId) throws IOException { + final Tuple groupingKey, @Nullable final Integer partitionId) throws IOException { final FDBDirectoryManager manager = getDirectoryManager(recordStore, index); return manager.getIndexReader(groupingKey, partitionId); }