diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/mappers/UrnSearchAcrossLineageResultsMapper.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/mappers/UrnSearchAcrossLineageResultsMapper.java index b39b960bb75801..b85303909c0801 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/mappers/UrnSearchAcrossLineageResultsMapper.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/mappers/UrnSearchAcrossLineageResultsMapper.java @@ -16,6 +16,7 @@ import com.linkedin.metadata.search.LineageSearchEntity; import com.linkedin.metadata.search.LineageSearchResult; import com.linkedin.metadata.search.SearchResultMetadata; +import java.util.ArrayList; import java.util.stream.Collectors; import javax.annotation.Nullable; @@ -69,7 +70,8 @@ private SearchAcrossLineageResult mapResult( .map(p -> mapPath(context, p)) .collect(Collectors.toList())) .setDegree(searchEntity.getDegree()) - .setDegrees(searchEntity.getDegrees().stream().collect(Collectors.toList())) + .setDegrees(new ArrayList<>(searchEntity.getDegrees())) + .setExplored(Boolean.TRUE.equals(searchEntity.isExplored())) .build(); } diff --git a/datahub-graphql-core/src/main/resources/search.graphql b/datahub-graphql-core/src/main/resources/search.graphql index 2b29994332d07a..13c1ff2e8a7648 100644 --- a/datahub-graphql-core/src/main/resources/search.graphql +++ b/datahub-graphql-core/src/main/resources/search.graphql @@ -644,7 +644,7 @@ type ScrollResults { } """ -Results returned by issueing a search across relationships query +Results returned by issuing a search across relationships query """ type SearchAcrossLineageResults { """ @@ -679,7 +679,7 @@ type SearchAcrossLineageResults { } """ -Results returned by issueing a search across relationships query using scroll API +Results returned by issuing a search across relationships query using scroll API """ type ScrollAcrossLineageResults { """ @@ -742,6 +742,11 @@ type SearchAcrossLineageResult { """ degrees: [Int!] + """ + Marks whether or not this entity was explored further for lineage + """ + explored: Boolean! + } """ diff --git a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphQueryDAO.java b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphQueryDAO.java index 15a16833aeb7bc..ea8d8fea54633c 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphQueryDAO.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphQueryDAO.java @@ -345,6 +345,8 @@ private Stream processOneHopLineage( int i) { // Do one hop on the lineage graph + int numHops = i + 1; // Zero indexed for loop counter, one indexed count + int remainingHops = maxHops - numHops; List oneHopRelationships = getLineageRelationshipsInBatches( currentLevel, @@ -352,8 +354,8 @@ private Stream processOneHopLineage( graphFilters, visitedEntities, viaEntities, - i + 1, - maxHops - (i + 1), + numHops, + remainingHops, remainingTime, existingPaths, exploreMultiplePaths, @@ -387,8 +389,9 @@ private Stream processOneHopLineage( || platformMatches( lineageRelationship.getEntity(), ignoreAsHops.get(entityType))))) - .forEach( - lineageRelationship -> additionalCurrentLevel.add(lineageRelationship.getEntity())); + .map(LineageRelationship::getEntity) + .forEach(additionalCurrentLevel::add); + ; if (!additionalCurrentLevel.isEmpty()) { Stream ignoreAsHopUrns = processOneHopLineage( @@ -417,6 +420,14 @@ private Stream processOneHopLineage( .sorted(Comparator.comparing(Urn::toString)) .limit(lineageFlags.getEntitiesExploredPerHopLimit()); } + if (remainingHops > 0) { + // If there are hops remaining, we expect to explore everything getting passed back to the + // loop, barring a timeout + List entitiesToExplore = intermediateStream.collect(Collectors.toList()); + entitiesToExplore.forEach(urn -> result.get(urn).setExplored(true)); + // reassign the stream after consuming it + intermediateStream = entitiesToExplore.stream(); + } } return intermediateStream; } diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/LineageSearchService.java b/metadata-io/src/main/java/com/linkedin/metadata/search/LineageSearchService.java index 3ea117663c23da..bb316f6f2b41c3 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/LineageSearchService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/LineageSearchService.java @@ -738,6 +738,7 @@ private LineageSearchEntity buildLineageSearchEntity( if (lineageRelationship.hasDegrees()) { entity.setDegrees(lineageRelationship.getDegrees()); } + entity.setExplored(Boolean.TRUE.equals(lineageRelationship.isExplored())); } return entity; } diff --git a/metadata-io/src/test/java/com/linkedin/metadata/graph/search/SearchGraphServiceTestBase.java b/metadata-io/src/test/java/com/linkedin/metadata/graph/search/SearchGraphServiceTestBase.java index b389f8228a98d6..85ca7ce7a1629d 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/graph/search/SearchGraphServiceTestBase.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/graph/search/SearchGraphServiceTestBase.java @@ -410,6 +410,29 @@ public void testTimestampLineage() throws Exception { Assert.assertEquals(Integer.valueOf(2), downstreamResult.getTotal()); } + @Test + public void testExplored() throws Exception { + + List edges = + Arrays.asList( + // One upstream edge + new Edge(dataset2Urn, dataset1Urn, downstreamOf, null, null, null, null, null), + // Two downstream + new Edge(dataset3Urn, dataset2Urn, downstreamOf, null, null, null, null, null), + new Edge(dataset4Urn, dataset2Urn, downstreamOf, null, null, null, null, null), + // One with null values, should always be returned + new Edge(dataset5Urn, dataset2Urn, downstreamOf, null, null, null, null, null)); + + edges.forEach(getGraphService()::addEdge); + syncAfterWrite(); + + EntityLineageResult result = getUpstreamLineage(dataset2Urn, null, null, 10); + Assert.assertTrue(Boolean.TRUE.equals(result.getRelationships().get(0).isExplored())); + + EntityLineageResult result2 = getUpstreamLineage(dataset2Urn, null, null, 10, 0); + Assert.assertTrue(result2.getRelationships().get(0).isExplored() == null); + } + /** * Utility method to reduce repeated parameters for lineage tests * diff --git a/metadata-models/src/main/pegasus/com/linkedin/metadata/graph/LineageRelationship.pdl b/metadata-models/src/main/pegasus/com/linkedin/metadata/graph/LineageRelationship.pdl index c25a1cee7db474..a169157955e67b 100644 --- a/metadata-models/src/main/pegasus/com/linkedin/metadata/graph/LineageRelationship.pdl +++ b/metadata-models/src/main/pegasus/com/linkedin/metadata/graph/LineageRelationship.pdl @@ -67,4 +67,9 @@ record LineageRelationship { * Replaces the deprecated field "degree". **/ degrees: optional array[int] + + /** + * Marks this relationship as explored during the graph walk + */ + explored: optional boolean } diff --git a/metadata-models/src/main/pegasus/com/linkedin/metadata/search/LineageSearchEntity.pdl b/metadata-models/src/main/pegasus/com/linkedin/metadata/search/LineageSearchEntity.pdl index e99115893712d2..fdfc8b2d53291c 100644 --- a/metadata-models/src/main/pegasus/com/linkedin/metadata/search/LineageSearchEntity.pdl +++ b/metadata-models/src/main/pegasus/com/linkedin/metadata/search/LineageSearchEntity.pdl @@ -29,4 +29,9 @@ record LineageSearchEntity includes SearchEntity { * The degrees of separation (number of hops) between the source and this entity */ degrees: array[int] = [] + + /** + * Marks an entity as having been explored for as a part of the graph walk + */ + explored: optional boolean } \ No newline at end of file diff --git a/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.entities.snapshot.json b/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.entities.snapshot.json index 011b9e419a0c0e..4915f06ffe5d2a 100644 --- a/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.entities.snapshot.json +++ b/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.entities.snapshot.json @@ -6205,6 +6205,11 @@ }, "doc" : "The degrees of separation (number of hops) between the source and this entity ", "default" : [ ] + }, { + "name" : "explored", + "type" : "boolean", + "doc" : "Marks an entity as having been explored for as a part of the graph walk", + "optional" : true } ] } }, diff --git a/metadata-service/restli-api/src/main/snapshot/com.linkedin.lineage.relationships.snapshot.json b/metadata-service/restli-api/src/main/snapshot/com.linkedin.lineage.relationships.snapshot.json index 056ca0e4da2065..00b3c925d0e731 100644 --- a/metadata-service/restli-api/src/main/snapshot/com.linkedin.lineage.relationships.snapshot.json +++ b/metadata-service/restli-api/src/main/snapshot/com.linkedin.lineage.relationships.snapshot.json @@ -177,6 +177,11 @@ }, "doc" : "The different depths at which this entity is discovered in the lineage graph.\nMarked as optional to maintain backward compatibility, but is filled out by implementations. \nReplaces the deprecated field \"degree\".\n", "optional" : true + }, { + "name" : "explored", + "type" : "boolean", + "doc" : "Marks this relationship as explored during the graph walk", + "optional" : true } ] } },