Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reindex subset of vertices #4726

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,7 @@ public static void evaluateQuery(JanusGraphQuery query, ElementCategory resultTy
}

protected ScanMetrics executeScanJob(VertexScanJob job) throws Exception {
return executeScanJob(VertexJobConverter.convert(graph,job));
return executeScanJob(VertexJobConverter.convert(graph, job, null));
}

protected ScanMetrics executeScanJob(ScanJob job) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@
import org.janusgraph.graphdb.types.vertices.JanusGraphSchemaVertex;
import org.janusgraph.graphdb.vertices.CacheVertex;
import org.janusgraph.testutil.TestGraphConfigs;
import org.javatuples.Pair;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
Expand Down Expand Up @@ -1451,6 +1452,94 @@ public void testCompositeVsMixedIndexing() {
assertTrue(tx.traversal().V().has("intId2", 234).hasNext());
}

@Test
public void testSubsetReindex() throws Exception {

clopen(option(FORCE_INDEX_USAGE), true);

mgmt.makeVertexLabel("cat").make();
mgmt.makeVertexLabel("dog").make();

makeKey("id", Integer.class);
makeKey("name", String.class);
final PropertyKey typeKey = makeKey("type", String.class);

String typeIndex = "searchByType";
mgmt.buildIndex(typeIndex, Vertex.class)
.addKey(typeKey)
.buildCompositeIndex();
mgmt.commit();

//Cats
int catsCount = 3;
for (int i = 0; i < catsCount; i++) {
Vertex v = tx.addVertex("cat");
v.property("id", i);
v.property("name", "cat_" + i);
v.property("type", "cat");
}

//Dogs
for (int i = 0; i < 5; i++) {
Vertex v = tx.addVertex("dog");
v.property("id", i);
v.property("name", "dog_" + i);
v.property("type", "dog");
}

tx.commit();

//Select a subset of vertices to index
clopen(option(FORCE_INDEX_USAGE), true);
List<Vertex> cats = tx.traversal().V().has("type", "cat").toList();
assertEquals(catsCount, cats.size());
String excludedCat = cats.get(cats.size() - 1).value("name");
List<Pair<Object, String>> catsSubset = cats.subList(0, cats.size() - 1).stream()
.map(kitty -> new Pair<Object, String>(kitty.id(), kitty.value("name")))
.collect(Collectors.toList());

List<Vertex> dogs = tx.traversal().V().has("type", "dog").toList();
assertEquals(5, dogs.size());
tx.rollback();

//Create new Index
graph.getOpenTransactions().forEach(JanusGraphTransaction::rollback);
mgmt = graph.openManagement();
mgmt.getOpenInstances().stream().filter(i -> !i.contains("current")).forEach(i -> mgmt.forceCloseInstance(i));
mgmt.commit();

String catsNameIndex = "searchByName_CatsOnly";
mgmt = graph.openManagement();
mgmt.buildIndex(catsNameIndex, Vertex.class)
.addKey(mgmt.getPropertyKey("name"))
.indexOnly(mgmt.getVertexLabel("cat"))
.buildCompositeIndex();
mgmt.commit();

//Make Index as REGISTERED
mgmt = graph.openManagement();
mgmt.updateIndex(mgmt.getGraphIndex(catsNameIndex), SchemaAction.REGISTER_INDEX).get();
mgmt.commit();
ManagementSystem.awaitGraphIndexStatus(graph, catsNameIndex).status(SchemaStatus.REGISTERED).call();

//Reindex a given subset
List<Object> reIndexOnlyIds = catsSubset.stream().map(Pair::getValue0).collect(Collectors.toList());
mgmt = graph.openManagement();
mgmt.updateIndex(mgmt.getGraphIndex(catsNameIndex), SchemaAction.REINDEX, reIndexOnlyIds).get();
mgmt.commit();
ManagementSystem.awaitGraphIndexStatus(graph, catsNameIndex).status(SchemaStatus.ENABLED).call();

clopen(option(FORCE_INDEX_USAGE), true);
catsSubset.forEach(kitty -> {
List<Vertex> catsByName = tx.traversal().V().hasLabel("cat").has("name", kitty.getValue1()).toList();
assertEquals(1, catsByName.size());
});

List<Vertex> catsByName = tx.traversal().V().hasLabel("cat").has("name", excludedCat).toList();
assertEquals(0, catsByName.size());
tx.rollback();
}

@Test
public void testIndexInlineProperties() throws NoSuchMethodException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.janusgraph.diskstorage.keycolumnvalue.scan.ScanJobFuture;

import java.time.Duration;
import java.util.List;
import java.util.Set;

/**
Expand Down Expand Up @@ -341,6 +342,17 @@ interface IndexBuilder {
*/
ScanJobFuture updateIndex(Index index, SchemaAction updateAction, int numOfThreads);

/**
* Updates the provided index according to the given {@link SchemaAction} for
* the given subset of vertices.
*
* @param index
* @param updateAction
* @param vertexOnly Set of vertexIds that only should be considered for index update
* @return a future that completes when the index action is done
*/
ScanJobFuture updateIndex(Index index, SchemaAction updateAction, List<Object> vertexOnly);

/**
* If an index update job was triggered through {@link #updateIndex(Index, SchemaAction)} with schema actions
* {@link org.janusgraph.core.schema.SchemaAction#REINDEX} or {@link org.janusgraph.core.schema.SchemaAction#DISCARD_INDEX}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.AUTH_PASSWORD;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.AUTH_USERNAME;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.CONNECTION_TIMEOUT;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.KEYS_SIZE;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.PAGE_SIZE;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.STORAGE_HOSTS;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.STORAGE_PORT;
Expand Down Expand Up @@ -69,6 +70,7 @@ public enum Deployment {
protected final int port;
protected final Duration connectionTimeoutMS;
protected final int pageSize;
protected final int keysSize;

protected final String username;
protected final String password;
Expand All @@ -83,6 +85,7 @@ public DistributedStoreManager(Configuration storageConfig, int portDefault) {
else this.port = portDefault;
this.connectionTimeoutMS = storageConfig.get(CONNECTION_TIMEOUT);
this.pageSize = storageConfig.get(PAGE_SIZE);
this.keysSize = storageConfig.get(KEYS_SIZE);
this.times = storageConfig.get(TIMESTAMP_PROVIDER);

if (storageConfig.has(AUTH_USERNAME)) {
Expand Down Expand Up @@ -121,6 +124,15 @@ public int getPageSize() {
return pageSize;
}

/**
* Returns the default configured keys size for this storage backend. The keys size is used to determine
* how many keys/partitions to request from storage within single request.
* @return
*/
public int getKeysSize() {
return keysSize;
}

/*
* TODO this should go away once we have a JanusGraphConfig that encapsulates TimestampProvider
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package org.janusgraph.diskstorage.keycolumnvalue;

import org.apache.commons.lang.NotImplementedException;
import org.janusgraph.diskstorage.BackendException;
import org.janusgraph.diskstorage.Entry;
import org.janusgraph.diskstorage.EntryList;
Expand Down Expand Up @@ -181,6 +182,10 @@ default Map<SliceQuery, Map<StaticBuffer, EntryList>> getMultiSlices(MultiKeysQu
*/
void acquireLock(StaticBuffer key, StaticBuffer column, StaticBuffer expectedValue, StoreTransaction txh) throws BackendException;

default KeyIterator getKeys(final List<StaticBuffer> keys, final SliceQuery query, final StoreTransaction txh) throws BackendException {
throw new NotImplementedException();
}

/**
* Returns a {@link KeyIterator} over all keys that fall within the key-range specified by the given query and have one or more columns matching the column-range.
* Calling {@link KeyIterator#getEntries()} returns the list of all entries that match the column-range specified by the given query.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class MultiThreadsRowsCollector extends RowsCollector {
private final StoreTransaction storeTx;
private final List<SliceQuery> queries;
private final Predicate<StaticBuffer> keyFilter;
private final List<StaticBuffer> keysToScan;
private final Configuration graphConfiguration;
private final DataPuller[] pullThreads;
private final BlockingQueue<SliceResult>[] dataQueues;
Expand All @@ -72,6 +73,7 @@ class MultiThreadsRowsCollector extends RowsCollector {
StoreTransaction storeTx,
List<SliceQuery> queries,
Predicate<StaticBuffer> keyFilter,
List<StaticBuffer> keysToScan,
BlockingQueue<Row> rowQueue,
Configuration graphConfiguration) throws BackendException {

Expand All @@ -80,6 +82,7 @@ class MultiThreadsRowsCollector extends RowsCollector {
this.storeTx = storeTx;
this.queries = queries;
this.keyFilter = keyFilter;
this.keysToScan = keysToScan;
this.graphConfiguration = graphConfiguration;

this.dataQueues = new BlockingQueue[queries.size()];
Expand Down Expand Up @@ -189,8 +192,14 @@ private void addDataPuller(SliceQuery sq, StoreTransaction stx, int pos) throws
this.graphConfiguration.get(GraphDatabaseConfiguration.PAGE_SIZE));
dataQueues[pos] = queue;

DataPuller dp = new DataPuller(sq, queue,
KCVSUtil.getKeys(store,sq,storeFeatures, MAX_KEY_LENGTH,stx), keyFilter);
KeyIterator keyIterator;
if (keysToScan != null) {
keyIterator = store.getKeys(keysToScan, sq, stx);
} else {
keyIterator = KCVSUtil.getKeys(store, sq, storeFeatures, MAX_KEY_LENGTH, stx);
}

DataPuller dp = new DataPuller(sq, queue, keyIterator, keyFilter);
pullThreads[pos] = dp;
dp.setName("data-puller-" + pos); // setting the name for thread dumps!
dp.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,14 @@ default void workerIterationEnd(ScanMetrics metrics) {}
*/
List<SliceQuery> getQueries();

/**
* Get keys to scan by the job. If stream is empty, all keys will be scanned.
* @return
*/
default List<StaticBuffer> getKeysToScan() {
return null;
}

/**
* A predicate that determines whether
* {@link #process(org.janusgraph.diskstorage.StaticBuffer, java.util.Map, ScanMetrics)}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ private RowsCollector buildScanner(BlockingQueue<Row> processorQueue, List<Slice
job.getKeyFilter(), processorQueue);
} else {
return new MultiThreadsRowsCollector(store, storeFeatures, storeTx, queries,
job.getKeyFilter(), processorQueue, graphConfiguration);
job.getKeyFilter(), job.getKeysToScan(), processorQueue, graphConfiguration);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,18 @@
});
}

@Override
public KeyIterator getKeys(final List<StaticBuffer> keys, final SliceQuery query, final StoreTransaction txh) throws BackendException {
return runWithMetrics(txh, metricsStoreName, M_GET_KEYS, () -> {
final KeyIterator ki = backend.getKeys(keys, query, txh);

Check warning on line 171 in janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/MetricInstrumentedStore.java

View check run for this annotation

Codecov / codecov/patch

janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/MetricInstrumentedStore.java#L170-L171

Added lines #L170 - L171 were not covered by tests
if (txh.getConfiguration().hasGroupName()) {
return MetricInstrumentedIterator.of(ki, txh.getConfiguration().getGroupName(), metricsStoreName, M_GET_KEYS, M_ITERATOR);

Check warning on line 173 in janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/MetricInstrumentedStore.java

View check run for this annotation

Codecov / codecov/patch

janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/MetricInstrumentedStore.java#L173

Added line #L173 was not covered by tests
} else {
return ki;

Check warning on line 175 in janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/MetricInstrumentedStore.java

View check run for this annotation

Codecov / codecov/patch

janusgraph-core/src/main/java/org/janusgraph/diskstorage/util/MetricInstrumentedStore.java#L175

Added line #L175 was not covered by tests
}
});
}

@Override
public KeyIterator getKeys(final KeyRangeQuery query, final StoreTransaction txh) throws BackendException {
return runWithMetrics(txh, metricsStoreName, M_GET_KEYS, () -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -830,6 +830,10 @@ public boolean apply(@Nullable String s) {
"up to this many elements.",
ConfigOption.Type.MASKABLE, 100);

public static final ConfigOption<Integer> KEYS_SIZE = new ConfigOption<>(STORAGE_NS,"keys-size",
"The maximum amount of keys/partitions to retrieve from distributed storage system by JanusGraph in a single request.",
ConfigOption.Type.MASKABLE, 100);

Comment on lines +833 to +836
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems this config was not added into the configuration reference and that's why CI is failing.
Could you please execute mvn --quiet clean install -DskipTests=true -pl janusgraph-doc -am and amend your commit? This will automatically re-generate configuration reference documentation.

public static final ConfigOption<Boolean> DROP_ON_CLEAR = new ConfigOption<>(STORAGE_NS, "drop-on-clear",
"Whether to drop the graph database (true) or delete rows (false) when clearing storage. " +
"Note that some backends always drop the graph database when clearing storage. Also note that indices are " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -914,11 +914,20 @@
--------------- */
@Override
public ScanJobFuture updateIndex(Index index, SchemaAction updateAction) {
return updateIndex(index, updateAction, Runtime.getRuntime().availableProcessors());
return updateIndex(index, updateAction, null, Runtime.getRuntime().availableProcessors());
}

@Override
public ScanJobFuture updateIndex(Index index, SchemaAction updateAction, int numOfThreads) {
return updateIndex(index, updateAction, null, numOfThreads);
}

@Override
public ScanJobFuture updateIndex(Index index, SchemaAction updateAction, List<Object> vertexOnly) {
return updateIndex(index, updateAction, vertexOnly, Runtime.getRuntime().availableProcessors());
}

private ScanJobFuture updateIndex(Index index, SchemaAction updateAction, List<Object> vertexOnly, int numOfThreads) {

Check warning on line 930 in janusgraph-core/src/main/java/org/janusgraph/graphdb/database/management/ManagementSystem.java

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

janusgraph-core/src/main/java/org/janusgraph/graphdb/database/management/ManagementSystem.java#L930

The method 'updateIndex(Index, SchemaAction, List, int)' has an NPath complexity of 270, current threshold is 200
Preconditions.checkArgument(index != null, "Need to provide an index");
Preconditions.checkArgument(updateAction != null, "Need to provide update action");

Expand Down Expand Up @@ -967,7 +976,7 @@
builder.setFinishJob(indexId.getIndexJobFinisher(graph, SchemaAction.ENABLE_INDEX));
builder.setJobId(indexId);
builder.setNumProcessingThreads(numOfThreads);
builder.setJob(VertexJobConverter.convert(graph, new IndexRepairJob(indexId.indexName, indexId.relationTypeName)));
builder.setJob(VertexJobConverter.convert(graph, new IndexRepairJob(indexId.indexName, indexId.relationTypeName), vertexOnly));
try {
future = builder.execute();
} catch (BackendException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
public abstract class AbstractScanJob implements ScanJob {
protected final GraphProvider graph;
protected StandardJanusGraphTx tx;
private IDManager idManager;
protected IDManager idManager;

public AbstractScanJob(JanusGraph graph) {
this.graph = new GraphProvider();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import java.util.stream.Collectors;

/**
* @author Matthias Broecheler ([email protected])
Expand All @@ -50,23 +51,31 @@

protected final VertexScanJob job;

protected final List<Object> vertexIdsToScan;

protected VertexJobConverter(JanusGraph graph, VertexScanJob job) {
this(graph, job, null);
}

protected VertexJobConverter(JanusGraph graph, VertexScanJob job, List<Object> vertexIdsToScan) {
super(graph);
Preconditions.checkArgument(job!=null);
this.job = job;
this.vertexIdsToScan = vertexIdsToScan;
}

protected VertexJobConverter(VertexJobConverter copy) {
super(copy);
this.job = copy.job.clone();
this.vertexIdsToScan = copy.vertexIdsToScan;
}

public static ScanJob convert(JanusGraph graph, VertexScanJob vertexJob) {
return new VertexJobConverter(graph,vertexJob);
public static ScanJob convert(JanusGraph graph, VertexScanJob vertexJob, List<Object> vertexIdsToScan) {
return new VertexJobConverter(graph, vertexJob, vertexIdsToScan);
}

public static ScanJob convert(VertexScanJob vertexJob) {
return new VertexJobConverter(null,vertexJob);
return new VertexJobConverter(null, vertexJob, null);

Check warning on line 78 in janusgraph-core/src/main/java/org/janusgraph/graphdb/olap/VertexJobConverter.java

View check run for this annotation

Codecov / codecov/patch

janusgraph-core/src/main/java/org/janusgraph/graphdb/olap/VertexJobConverter.java#L78

Added line #L78 was not covered by tests
}

@Override
Expand Down Expand Up @@ -130,6 +139,18 @@
}
}

@Override
public List<StaticBuffer> getKeysToScan() {
if (this.vertexIdsToScan == null) {
return null;
} else {
return this.vertexIdsToScan
.stream()
.map(k -> idManager.getKey(k))
.collect(Collectors.toList());
}
}

@Override
public Predicate<StaticBuffer> getKeyFilter() {
return buffer -> !IDManager.VertexIDType.Invisible.is(getVertexId(buffer));
Expand Down
Loading
Loading