Skip to content

Commit

Permalink
Alerts for data transformers (#758)
Browse files Browse the repository at this point in the history
  • Loading branch information
damirabdul authored Jun 21, 2022
1 parent 7c8c128 commit 18154ae
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@
public enum AlertTypeEnum {
DISTRIBUTION_ANOMALY,
BACKWARDS_INCOMPATIBLE_SCHEMA,
FAILED_DQ_TEST
FAILED_DQ_TEST,
FAILED_JOB
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
public interface AlertLocator {
List<AlertPojo> locateDatasetBackIncSchema(final Map<String, DatasetStructureDelta> structureDeltas);

List<AlertPojo> locateDataQualityTestRunFailed(final List<IngestionTaskRun> taskRuns);
List<AlertPojo> locateDataEntityRunFailed(final List<IngestionTaskRun> taskRuns);

List<AlertPojo> locateEarlyBackIncSchema(final List<DataEntitySpecificAttributesDelta> deltas);
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.RequiredArgsConstructor;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.SetUtils;
import org.opendatadiscovery.oddplatform.dto.DataEntityClassDto;
import org.opendatadiscovery.oddplatform.dto.DataEntitySpecificAttributesDelta;
Expand Down Expand Up @@ -46,29 +47,18 @@ public List<AlertPojo> locateDatasetBackIncSchema(final Map<String, DatasetStruc
}

@Override
public List<AlertPojo> locateDataQualityTestRunFailed(final List<IngestionTaskRun> taskRuns) {
final List<IngestionTaskRun> failedTaskRuns = taskRuns.stream()
.filter(tr -> tr.getType().equals(IngestionTaskRun.IngestionTaskRunType.DATA_QUALITY_TEST_RUN))
public List<AlertPojo> locateDataEntityRunFailed(final List<IngestionTaskRun> taskRuns) {
final Map<IngestionTaskRun.IngestionTaskRunType, List<IngestionTaskRun>> failedTaskRunsMap = taskRuns.stream()
.filter(tr -> TASK_RUN_BAD_STATUSES.contains(tr.getStatus()))
.toList();
if (failedTaskRuns.isEmpty()) {
.collect(Collectors.groupingBy(IngestionTaskRun::getType));
if (failedTaskRunsMap.isEmpty()) {
return List.of();
}
final Map<String, List<DataQualityTestRelationsPojo>> relationsMap = dataQualityTestRelationRepository
.getRelations(failedTaskRuns.stream().map(IngestionTaskRun::getDataEntityOddrn).toList())
.stream()
.collect(Collectors.groupingBy(DataQualityTestRelationsPojo::getDataQualityTestOddrn));
return taskRuns.stream()
.flatMap(tr -> {
final List<DataQualityTestRelationsPojo> relatedPojos =
relationsMap.getOrDefault(tr.getDataEntityOddrn(), List.of());
return relatedPojos.stream().map(p -> buildAlert(
p.getDatasetOddrn(),
AlertTypeEnum.FAILED_DQ_TEST,
tr.getOddrn(),
String.format("Test %s failed with status %s", tr.getTaskName(), tr.getStatus())
));
}).toList();
final List<AlertPojo> dqAlerts = locateDataQualityTestAlerts(
failedTaskRunsMap.get(IngestionTaskRun.IngestionTaskRunType.DATA_QUALITY_TEST_RUN));
final List<AlertPojo> jobAlerts = locateDataTransformerAlerts(
failedTaskRunsMap.get(IngestionTaskRun.IngestionTaskRunType.DATA_TRANSFORMER_RUN));
return Stream.concat(dqAlerts.stream(), jobAlerts.stream()).toList();
}

@Override
Expand All @@ -84,6 +74,40 @@ public List<AlertPojo> locateEarlyBackIncSchema(final List<DataEntitySpecificAtt
return Stream.concat(transformerAlerts, consumerAlerts).collect(Collectors.toList());
}

private List<AlertPojo> locateDataTransformerAlerts(final List<IngestionTaskRun> failedDataTransformerRuns) {
if (CollectionUtils.isEmpty(failedDataTransformerRuns)) {
return List.of();
}
return failedDataTransformerRuns.stream()
.map(tr -> buildAlert(
tr.getDataEntityOddrn(),
AlertTypeEnum.FAILED_JOB,
tr.getOddrn(),
String.format("Job %s failed with status %s", tr.getTaskName(), tr.getStatus())
)).toList();
}

private List<AlertPojo> locateDataQualityTestAlerts(final List<IngestionTaskRun> failedDQTestRuns) {
if (CollectionUtils.isEmpty(failedDQTestRuns)) {
return List.of();
}
final Map<String, List<DataQualityTestRelationsPojo>> relationsMap = dataQualityTestRelationRepository
.getRelations(failedDQTestRuns.stream().map(IngestionTaskRun::getDataEntityOddrn).toList())
.stream()
.collect(Collectors.groupingBy(DataQualityTestRelationsPojo::getDataQualityTestOddrn));
return failedDQTestRuns.stream()
.flatMap(tr -> {
final List<DataQualityTestRelationsPojo> relatedPojos =
relationsMap.getOrDefault(tr.getDataEntityOddrn(), List.of());
return relatedPojos.stream().map(p -> buildAlert(
p.getDatasetOddrn(),
AlertTypeEnum.FAILED_DQ_TEST,
tr.getOddrn(),
String.format("Test %s failed with status %s", tr.getTaskName(), tr.getStatus())
));
}).toList();
}

private Stream<AlertPojo> locateAlertsInDSDelta(final Map.Entry<String, DatasetStructureDelta> e) {
final Map<DatasetFieldKey, DatasetFieldPojo> latestVersionFields = e.getValue().getLatest()
.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public Mono<Void> ingest(final DataEntityList dataEntityList) {

final List<AlertPojo> alerts = Stream.of(
alertLocator.locateDatasetBackIncSchema(datasetStructureDelta),
alertLocator.locateDataQualityTestRunFailed(dataStructure.getTaskRuns()),
alertLocator.locateDataEntityRunFailed(dataStructure.getTaskRuns()),
dataStructure.getEarlyAlerts()
).flatMap(List::stream).collect(Collectors.toList());

Expand Down
1 change: 1 addition & 0 deletions odd-platform-specification/components.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1587,6 +1587,7 @@ components:
- DISTRIBUTION_ANOMALY
- BACKWARDS_INCOMPATIBLE_SCHEMA
- FAILED_DQ_TEST
- FAILED_JOB

AlertStatus:
type: string
Expand Down

0 comments on commit 18154ae

Please sign in to comment.