Skip to content

Commit

Permalink
Reindex subset of vertices
Browse files Browse the repository at this point in the history
Signed-off-by: ntisseyre <[email protected]>
  • Loading branch information
ntisseyre committed Nov 15, 2024
1 parent 3b8843f commit 6104281
Show file tree
Hide file tree
Showing 21 changed files with 528 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,7 @@ public static void evaluateQuery(JanusGraphQuery query, ElementCategory resultTy
}

protected ScanMetrics executeScanJob(VertexScanJob job) throws Exception {
return executeScanJob(VertexJobConverter.convert(graph,job));
return executeScanJob(VertexJobConverter.convert(graph, job, null));
}

protected ScanMetrics executeScanJob(ScanJob job) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1451,6 +1451,90 @@ public void testCompositeVsMixedIndexing() {
assertTrue(tx.traversal().V().has("intId2", 234).hasNext());
}

@Test
public void testSubsetReindex() throws Exception {

clopen(option(FORCE_INDEX_USAGE), true);

mgmt.makeVertexLabel("cat").make();
mgmt.makeVertexLabel("dog").make();

makeKey("id", Integer.class);
makeKey("name", String.class);
final PropertyKey typeKey = makeKey("type", String.class);

String typeIndex = "searchByType";
mgmt.buildIndex(typeIndex, Vertex.class)
.addKey(typeKey)
.buildCompositeIndex();
mgmt.commit();

//Cats
int catsCount = 3;
for (int i = 0; i < catsCount; i++) {
Vertex v = tx.addVertex("cat");
v.property("id", i);
v.property("name", "cat_" + i);
v.property("type", "cat");
}

//Dogs
for (int i = 0; i < 5; i++) {
Vertex v = tx.addVertex("dog");
v.property("id", i);
v.property("name", "dog_" + i);
v.property("type", "dog");
}

tx.commit();

//Select a subset of vertices to index
clopen(option(FORCE_INDEX_USAGE), true);
List<Vertex> cats = tx.traversal().V().has("type", "cat").toList();
assertEquals(catsCount, cats.size());

List<Vertex> dogs = tx.traversal().V().has("type", "dog").toList();
assertEquals(5, dogs.size());
tx.rollback();

//Create new Index
graph.getOpenTransactions().forEach(JanusGraphTransaction::rollback);
mgmt = graph.openManagement();
mgmt.getOpenInstances().stream().filter(i -> !i.contains("current")).forEach(i -> mgmt.forceCloseInstance(i));
mgmt.commit();

String catsNameIndex = "searchByName_CatsOnly";
mgmt = graph.openManagement();
mgmt.buildIndex(catsNameIndex, Vertex.class)
.addKey(mgmt.getPropertyKey("name"))
.indexOnly(mgmt.getVertexLabel("cat"))
.buildCompositeIndex();
mgmt.commit();

//Make Index as REGISTERED
mgmt = graph.openManagement();
mgmt.updateIndex(mgmt.getGraphIndex(catsNameIndex), SchemaAction.REGISTER_INDEX).get();
mgmt.commit();
ManagementSystem.awaitGraphIndexStatus(graph, catsNameIndex).status(SchemaStatus.REGISTERED).call();

//Reindex a given subset //cat_1 and cat_2 only
List<Object> subset = cats.subList(0, 2).stream().map(Element::id).collect(Collectors.toList());

mgmt = graph.openManagement();
mgmt.updateIndex(mgmt.getGraphIndex(catsNameIndex), SchemaAction.REINDEX, subset).get();
mgmt.commit();
ManagementSystem.awaitGraphIndexStatus(graph, catsNameIndex).status(SchemaStatus.ENABLED).call();

clopen(option(FORCE_INDEX_USAGE), true);

for (int i = 0; i < catsCount; i++) {
List<Vertex> catsByName = tx.traversal().V().hasLabel("cat").has("name", "cat_" + i).toList();
int expectedCount = (i == 2) ? 0 : 1;
assertEquals(expectedCount, catsByName.size());
}
tx.rollback();
}

@Test
public void testIndexInlineProperties() throws NoSuchMethodException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.janusgraph.diskstorage.keycolumnvalue.scan.ScanJobFuture;

import java.time.Duration;
import java.util.List;
import java.util.Set;

/**
Expand Down Expand Up @@ -341,6 +342,17 @@ interface IndexBuilder {
*/
ScanJobFuture updateIndex(Index index, SchemaAction updateAction, int numOfThreads);

/**
* Updates the provided index according to the given {@link SchemaAction} for
* the given subset of vertices.
*
* @param index
* @param updateAction
* @param vertexOnly Set of vertexIds that only should be considered for index update
* @return a future that completes when the index action is done
*/
ScanJobFuture updateIndex(Index index, SchemaAction updateAction, List<Object> vertexOnly);

/**
* If an index update job was triggered through {@link #updateIndex(Index, SchemaAction)} with schema actions
* {@link org.janusgraph.core.schema.SchemaAction#REINDEX} or {@link org.janusgraph.core.schema.SchemaAction#DISCARD_INDEX}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.AUTH_PASSWORD;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.AUTH_USERNAME;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.CONNECTION_TIMEOUT;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.KEYS_SIZE;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.PAGE_SIZE;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.STORAGE_HOSTS;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.STORAGE_PORT;
Expand Down Expand Up @@ -69,6 +70,7 @@ public enum Deployment {
protected final int port;
protected final Duration connectionTimeoutMS;
protected final int pageSize;
protected final int keysSize;

protected final String username;
protected final String password;
Expand All @@ -83,6 +85,7 @@ public DistributedStoreManager(Configuration storageConfig, int portDefault) {
else this.port = portDefault;
this.connectionTimeoutMS = storageConfig.get(CONNECTION_TIMEOUT);
this.pageSize = storageConfig.get(PAGE_SIZE);
this.keysSize = storageConfig.get(KEYS_SIZE);
this.times = storageConfig.get(TIMESTAMP_PROVIDER);

if (storageConfig.has(AUTH_USERNAME)) {
Expand Down Expand Up @@ -121,6 +124,15 @@ public int getPageSize() {
return pageSize;
}

/**
* Returns the default configured keys size for this storage backend. The keys size is used to determine
* how many keys/partitions to request from storage within single request.
* @return
*/
public int getKeysSize() {
return keysSize;
}

/*
* TODO this should go away once we have a JanusGraphConfig that encapsulates TimestampProvider
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package org.janusgraph.diskstorage.keycolumnvalue;

import org.apache.commons.lang.NotImplementedException;
import org.janusgraph.diskstorage.BackendException;
import org.janusgraph.diskstorage.Entry;
import org.janusgraph.diskstorage.EntryList;
Expand Down Expand Up @@ -181,6 +182,10 @@ default Map<SliceQuery, Map<StaticBuffer, EntryList>> getMultiSlices(MultiKeysQu
*/
void acquireLock(StaticBuffer key, StaticBuffer column, StaticBuffer expectedValue, StoreTransaction txh) throws BackendException;

default KeyIterator getKeys(final List<StaticBuffer> keys, final SliceQuery query, final StoreTransaction txh) throws BackendException {
throw new NotImplementedException();

Check warning on line 186 in janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/KeyColumnValueStore.java

View check run for this annotation

Codecov / codecov/patch

janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/KeyColumnValueStore.java#L186

Added line #L186 was not covered by tests
}

/**
* Returns a {@link KeyIterator} over all keys that fall within the key-range specified by the given query and have one or more columns matching the column-range.
* Calling {@link KeyIterator#getEntries()} returns the list of all entries that match the column-range specified by the given query.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class MultiThreadsRowsCollector extends RowsCollector {
private final StoreTransaction storeTx;
private final List<SliceQuery> queries;
private final Predicate<StaticBuffer> keyFilter;
private final List<StaticBuffer> keysToScan;
private final Configuration graphConfiguration;
private final DataPuller[] pullThreads;
private final BlockingQueue<SliceResult>[] dataQueues;
Expand All @@ -72,6 +73,7 @@ class MultiThreadsRowsCollector extends RowsCollector {
StoreTransaction storeTx,
List<SliceQuery> queries,
Predicate<StaticBuffer> keyFilter,
List<StaticBuffer> keysToScan,
BlockingQueue<Row> rowQueue,
Configuration graphConfiguration) throws BackendException {

Expand All @@ -80,6 +82,7 @@ class MultiThreadsRowsCollector extends RowsCollector {
this.storeTx = storeTx;
this.queries = queries;
this.keyFilter = keyFilter;
this.keysToScan = keysToScan;
this.graphConfiguration = graphConfiguration;

this.dataQueues = new BlockingQueue[queries.size()];
Expand Down Expand Up @@ -189,8 +192,14 @@ private void addDataPuller(SliceQuery sq, StoreTransaction stx, int pos) throws
this.graphConfiguration.get(GraphDatabaseConfiguration.PAGE_SIZE));
dataQueues[pos] = queue;

DataPuller dp = new DataPuller(sq, queue,
KCVSUtil.getKeys(store,sq,storeFeatures, MAX_KEY_LENGTH,stx), keyFilter);
KeyIterator keyIterator;
if (keysToScan != null) {
keyIterator = store.getKeys(keysToScan, sq, stx);
} else {
keyIterator = KCVSUtil.getKeys(store, sq, storeFeatures, MAX_KEY_LENGTH, stx);
}

DataPuller dp = new DataPuller(sq, queue, keyIterator, keyFilter);
pullThreads[pos] = dp;
dp.setName("data-puller-" + pos); // setting the name for thread dumps!
dp.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,14 @@ default void workerIterationEnd(ScanMetrics metrics) {}
*/
List<SliceQuery> getQueries();

/**
* Get keys to scan by the job. If stream is empty, all keys will be scanned.
* @return
*/
default List<StaticBuffer> getKeysToScan() {
return null;
}

/**
* A predicate that determines whether
* {@link #process(org.janusgraph.diskstorage.StaticBuffer, java.util.Map, ScanMetrics)}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ private RowsCollector buildScanner(BlockingQueue<Row> processorQueue, List<Slice
job.getKeyFilter(), processorQueue);
} else {
return new MultiThreadsRowsCollector(store, storeFeatures, storeTx, queries,
job.getKeyFilter(), processorQueue, graphConfiguration);
job.getKeyFilter(), job.getKeysToScan(), processorQueue, graphConfiguration);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,18 @@ public void acquireLock(final StaticBuffer key,
});
}

@Override
public KeyIterator getKeys(final List<StaticBuffer> keys, final SliceQuery query, final StoreTransaction txh) throws BackendException {
return runWithMetrics(txh, metricsStoreName, M_GET_KEYS, () -> {
final KeyIterator ki = backend.getKeys(keys, query, txh);

Check warning on line 171 in janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/MetricInstrumentedStore.java

View check run for this annotation

Codecov / codecov/patch

janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/MetricInstrumentedStore.java#L170-L171

Added lines #L170 - L171 were not covered by tests
if (txh.getConfiguration().hasGroupName()) {
return MetricInstrumentedIterator.of(ki, txh.getConfiguration().getGroupName(), metricsStoreName, M_GET_KEYS, M_ITERATOR);

Check warning on line 173 in janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/MetricInstrumentedStore.java

View check run for this annotation

Codecov / codecov/patch

janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/MetricInstrumentedStore.java#L173

Added line #L173 was not covered by tests
} else {
return ki;

Check warning on line 175 in janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/MetricInstrumentedStore.java

View check run for this annotation

Codecov / codecov/patch

janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/MetricInstrumentedStore.java#L175

Added line #L175 was not covered by tests
}
});
}

@Override
public KeyIterator getKeys(final KeyRangeQuery query, final StoreTransaction txh) throws BackendException {
return runWithMetrics(txh, metricsStoreName, M_GET_KEYS, () -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -830,6 +830,10 @@ public boolean apply(@Nullable String s) {
"up to this many elements.",
ConfigOption.Type.MASKABLE, 100);

public static final ConfigOption<Integer> KEYS_SIZE = new ConfigOption<>(STORAGE_NS,"keys-size",
"The maximum amount of keys/partitions to retrieve from distributed storage system by JanusGraph in a single request.",
ConfigOption.Type.MASKABLE, 100);

public static final ConfigOption<Boolean> DROP_ON_CLEAR = new ConfigOption<>(STORAGE_NS, "drop-on-clear",
"Whether to drop the graph database (true) or delete rows (false) when clearing storage. " +
"Note that some backends always drop the graph database when clearing storage. Also note that indices are " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -914,11 +914,20 @@ public JanusGraphIndex buildMixedIndex(String backingIndex) {
--------------- */
@Override
public ScanJobFuture updateIndex(Index index, SchemaAction updateAction) {
return updateIndex(index, updateAction, Runtime.getRuntime().availableProcessors());
return updateIndex(index, updateAction, null, Runtime.getRuntime().availableProcessors());
}

@Override
public ScanJobFuture updateIndex(Index index, SchemaAction updateAction, int numOfThreads) {
return updateIndex(index, updateAction, null, numOfThreads);
}

@Override
public ScanJobFuture updateIndex(Index index, SchemaAction updateAction, List<Object> vertexOnly) {
return updateIndex(index, updateAction, vertexOnly, Runtime.getRuntime().availableProcessors());
}

private ScanJobFuture updateIndex(Index index, SchemaAction updateAction, List<Object> vertexOnly, int numOfThreads) {

Check warning on line 930 in janusgraph-core/src/main/java/org/janusgraph/graphdb/database/management/ManagementSystem.java

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

janusgraph-core/src/main/java/org/janusgraph/graphdb/database/management/ManagementSystem.java#L930

The method 'updateIndex(Index, SchemaAction, List, int)' has an NPath complexity of 270, current threshold is 200
Preconditions.checkArgument(index != null, "Need to provide an index");
Preconditions.checkArgument(updateAction != null, "Need to provide update action");

Expand Down Expand Up @@ -967,7 +976,7 @@ public ScanJobFuture updateIndex(Index index, SchemaAction updateAction, int num
builder.setFinishJob(indexId.getIndexJobFinisher(graph, SchemaAction.ENABLE_INDEX));
builder.setJobId(indexId);
builder.setNumProcessingThreads(numOfThreads);
builder.setJob(VertexJobConverter.convert(graph, new IndexRepairJob(indexId.indexName, indexId.relationTypeName)));
builder.setJob(VertexJobConverter.convert(graph, new IndexRepairJob(indexId.indexName, indexId.relationTypeName), vertexOnly));
try {
future = builder.execute();
} catch (BackendException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
public abstract class AbstractScanJob implements ScanJob {
protected final GraphProvider graph;
protected StandardJanusGraphTx tx;
private IDManager idManager;
protected IDManager idManager;

public AbstractScanJob(JanusGraph graph) {
this.graph = new GraphProvider();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import java.util.stream.Collectors;

/**
* @author Matthias Broecheler ([email protected])
Expand All @@ -50,23 +51,31 @@ public class VertexJobConverter extends AbstractScanJob {

protected final VertexScanJob job;

protected final List<Object> vertexIdsToScan;

protected VertexJobConverter(JanusGraph graph, VertexScanJob job) {
this(graph, job, null);
}

protected VertexJobConverter(JanusGraph graph, VertexScanJob job, List<Object> vertexIdsToScan) {
super(graph);
Preconditions.checkArgument(job!=null);
this.job = job;
this.vertexIdsToScan = vertexIdsToScan;
}

protected VertexJobConverter(VertexJobConverter copy) {
super(copy);
this.job = copy.job.clone();
this.vertexIdsToScan = copy.vertexIdsToScan;
}

public static ScanJob convert(JanusGraph graph, VertexScanJob vertexJob) {
return new VertexJobConverter(graph,vertexJob);
public static ScanJob convert(JanusGraph graph, VertexScanJob vertexJob, List<Object> vertexIdsToScan) {
return new VertexJobConverter(graph, vertexJob, vertexIdsToScan);
}

public static ScanJob convert(VertexScanJob vertexJob) {
return new VertexJobConverter(null,vertexJob);
return new VertexJobConverter(null, vertexJob, null);

Check warning on line 78 in janusgraph-core/src/main/java/org/janusgraph/graphdb/olap/VertexJobConverter.java

View check run for this annotation

Codecov / codecov/patch

janusgraph-core/src/main/java/org/janusgraph/graphdb/olap/VertexJobConverter.java#L78

Added line #L78 was not covered by tests
}

@Override
Expand Down Expand Up @@ -130,6 +139,18 @@ public List<SliceQuery> getQueries() {
}
}

@Override
public List<StaticBuffer> getKeysToScan() {
if (this.vertexIdsToScan == null) {
return null;
} else {
return this.vertexIdsToScan
.stream()
.map(k -> idManager.getKey(k))
.collect(Collectors.toList());
}
}

@Override
public Predicate<StaticBuffer> getKeyFilter() {
return buffer -> !IDManager.VertexIDType.Invisible.is(getVertexId(buffer));
Expand Down
Loading

0 comments on commit 6104281

Please sign in to comment.