Skip to content

Commit

Permalink
collecting stats during harvester
Browse files Browse the repository at this point in the history
  • Loading branch information
ndc-dxc committed Nov 23, 2024
1 parent 5ca8aec commit 75a1939
Show file tree
Hide file tree
Showing 26 changed files with 311 additions and 34 deletions.
3 changes: 2 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ jacocoTestCoverageVerification {
'it.gov.innovazione.ndc.eventhandler.*',
'it.gov.innovazione.ndc.harvester.service.RepositoryService',
'it.gov.innovazione.ndc.harvester.service.HarvesterRunService',
'it.gov.innovazione.ndc.harvester.service.SemanticContentStatsService',
'it.gov.innovazione.ndc.harvester.SecurityUtils',
'it.gov.innovazione.ndc.*Exception',
'it.gov.innovazione.ndc.harvester.context.*',
Expand All @@ -195,7 +196,7 @@ jacocoTestCoverageVerification {
'it.gov.innovazione.ndc.repository.TripleStoreRepository',
'it.gov.innovazione.ndc.service.EventCleaner',
'it.gov.innovazione.ndc.service.TemplateService',
'it.gov.innovazione.ndc.harvester.service.startupjob.TempEraserStartupJob',
'it.gov.innovazione.ndc.harvester.service.startupjob.*',
'it.gov.innovazione.ndc.alerter.*'
]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import it.gov.innovazione.ndc.harvester.HarvesterService;
import it.gov.innovazione.ndc.harvester.model.index.SemanticAssetMetadata;
import it.gov.innovazione.ndc.harvester.service.RepositoryService;
import it.gov.innovazione.ndc.harvester.service.SemanticContentStatsService;
import it.gov.innovazione.ndc.harvester.service.startupjob.StartupJobsRunner;
import it.gov.innovazione.ndc.repository.TripleStoreProperties;
import org.elasticsearch.client.RestClient;
Expand Down Expand Up @@ -66,6 +67,9 @@ public class BaseIntegrationTest {
@SpyBean
RepositoryService repositoryService;

@SpyBean
SemanticContentStatsService semanticContentStatsService;

@Autowired
TripleStoreProperties virtuosoProps;

Expand Down Expand Up @@ -96,6 +100,7 @@ private void dataIsHarvested() throws IOException {
doReturn(cloneDir).when(agencyRepositoryService).cloneRepo(REPO_URL, null);
doNothing().when(agencyRepositoryService).removeClonedRepo(cloneDir);
doNothing().when(repositoryService).storeRightsHolders(any(), any());
doNothing().when(semanticContentStatsService).saveStats();

harvesterService.harvest(asRepo(REPO_URL));

Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
package it.gov.innovazione.ndc.harvester.context;

import it.gov.innovazione.ndc.model.harvester.SemanticContentStats;
import lombok.NoArgsConstructor;

import java.util.ArrayList;
import java.util.List;

@NoArgsConstructor(access = lombok.AccessLevel.PRIVATE)
public class HarvestExecutionContextUtils {

private static final ThreadLocal<HarvestExecutionContext> CONTEXT_HOLDER = new ThreadLocal<>();
private static final ThreadLocal<List<SemanticContentStats>> SEMANTIC_CONTENT_STATS_HOLDER = ThreadLocal.withInitial(ArrayList::new);

public static HarvestExecutionContext getContext() {
return CONTEXT_HOLDER.get();
Expand All @@ -17,5 +22,18 @@ public static void setContext(HarvestExecutionContext context) {

public static void clearContext() {
CONTEXT_HOLDER.remove();
clearSemanticContentStats();
}

public static List<SemanticContentStats> getSemanticContentStats() {
return SEMANTIC_CONTENT_STATS_HOLDER.get();
}

public static void addSemanticContentStat(SemanticContentStats stats) {
SEMANTIC_CONTENT_STATS_HOLDER.get().add(stats);
}

public static void clearSemanticContentStats() {
SEMANTIC_CONTENT_STATS_HOLDER.get().clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@
import it.gov.innovazione.ndc.harvester.context.HarvestExecutionContextUtils;
import it.gov.innovazione.ndc.harvester.exception.SinglePathProcessingException;
import it.gov.innovazione.ndc.harvester.harvesters.utils.PathUtils;
import it.gov.innovazione.ndc.harvester.model.HarvesterStatsHolder;
import it.gov.innovazione.ndc.harvester.model.Instance;
import it.gov.innovazione.ndc.harvester.model.SemanticAssetPath;
import it.gov.innovazione.ndc.harvester.service.SemanticContentStatsService;
import it.gov.innovazione.ndc.model.harvester.Repository;
import it.gov.innovazione.ndc.model.harvester.SemanticContentStats;
import it.gov.innovazione.ndc.service.logging.HarvesterStage;
import it.gov.innovazione.ndc.service.logging.LoggingContext;
import lombok.RequiredArgsConstructor;
Expand Down Expand Up @@ -45,6 +48,7 @@ public abstract class BaseSemanticAssetHarvester<P extends SemanticAssetPath> im
private final SemanticAssetType type;
private final NdcEventPublisher eventPublisher;
private final ConfigService configService;
private final SemanticContentStatsService semanticContentStatsService;

@Override
public SemanticAssetType getType() {
Expand All @@ -70,7 +74,8 @@ public void harvest(Repository repository, Path rootPath) {

for (P path : paths) {
try {
processPath(repository.getUrl(), path);
HarvesterStatsHolder harvesterStatsHolder = processPath(repository.getUrl(), path);
semanticContentStatsService.updateStats(harvesterStatsHolder);
log.debug("Path {} processed correctly for {}", path, type);
} catch (SinglePathProcessingException e) {
boolean isInfrastuctureError = checkInfrastructureError(e);
Expand Down Expand Up @@ -112,6 +117,7 @@ public void harvest(Repository repository, Path rootPath) {
}
}
}
semanticContentStatsService.saveStats();
}

private boolean checkInfrastructureError(SinglePathProcessingException e) {
Expand Down Expand Up @@ -190,7 +196,7 @@ public void cleanUpBeforeHarvesting(String repoUrl, Instance instance) {
// by default nothing specific
}

protected abstract void processPath(String repoUrl, P path);
protected abstract HarvesterStatsHolder processPath(String repoUrl, P path);

protected abstract List<P> scanForPaths(Path rootPath);
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
import it.gov.innovazione.ndc.harvester.AgencyRepositoryService;
import it.gov.innovazione.ndc.harvester.SemanticAssetType;
import it.gov.innovazione.ndc.harvester.model.CvPath;
import it.gov.innovazione.ndc.harvester.model.HarvesterStatsHolder;
import it.gov.innovazione.ndc.harvester.model.Instance;
import it.gov.innovazione.ndc.harvester.pathprocessors.ControlledVocabularyPathProcessor;
import it.gov.innovazione.ndc.harvester.service.SemanticContentStatsService;
import org.springframework.stereotype.Component;

import java.nio.file.Path;
Expand All @@ -17,15 +19,20 @@ public class ControlledVocabularyHarvester extends BaseSemanticAssetHarvester<Cv
private final AgencyRepositoryService agencyRepositoryService;
private final ControlledVocabularyPathProcessor pathProcessor;

public ControlledVocabularyHarvester(AgencyRepositoryService agencyRepositoryService, ControlledVocabularyPathProcessor pathProcessor, NdcEventPublisher ndcEventPublisher, ConfigService configService) {
super(SemanticAssetType.CONTROLLED_VOCABULARY, ndcEventPublisher, configService);
public ControlledVocabularyHarvester(
AgencyRepositoryService agencyRepositoryService,
ControlledVocabularyPathProcessor pathProcessor,
NdcEventPublisher ndcEventPublisher,
ConfigService configService,
SemanticContentStatsService semanticContentStatsService) {
super(SemanticAssetType.CONTROLLED_VOCABULARY, ndcEventPublisher, configService, semanticContentStatsService);
this.agencyRepositoryService = agencyRepositoryService;
this.pathProcessor = pathProcessor;
}

@Override
protected void processPath(String repoUrl, CvPath path) {
pathProcessor.process(repoUrl, path);
protected HarvesterStatsHolder processPath(String repoUrl, CvPath path) {
return pathProcessor.process(repoUrl, path);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
import it.gov.innovazione.ndc.eventhandler.event.ConfigService;
import it.gov.innovazione.ndc.harvester.AgencyRepositoryService;
import it.gov.innovazione.ndc.harvester.SemanticAssetType;
import it.gov.innovazione.ndc.harvester.model.HarvesterStatsHolder;
import it.gov.innovazione.ndc.harvester.model.SemanticAssetPath;
import it.gov.innovazione.ndc.harvester.pathprocessors.OntologyPathProcessor;
import it.gov.innovazione.ndc.harvester.service.SemanticContentStatsService;
import org.springframework.stereotype.Component;

import java.nio.file.Path;
Expand All @@ -16,15 +18,19 @@ public class OntologyHarvester extends BaseSemanticAssetHarvester<SemanticAssetP
private final AgencyRepositoryService agencyRepositoryService;
private final OntologyPathProcessor pathProcessor;

public OntologyHarvester(AgencyRepositoryService agencyRepositoryService, OntologyPathProcessor pathProcessor, NdcEventPublisher ndcEventPublisher, ConfigService configService) {
super(SemanticAssetType.ONTOLOGY, ndcEventPublisher, configService);
public OntologyHarvester(AgencyRepositoryService agencyRepositoryService,
OntologyPathProcessor pathProcessor,
NdcEventPublisher ndcEventPublisher,
ConfigService configService,
SemanticContentStatsService semanticContentStatsService) {
super(SemanticAssetType.ONTOLOGY, ndcEventPublisher, configService, semanticContentStatsService);
this.agencyRepositoryService = agencyRepositoryService;
this.pathProcessor = pathProcessor;
}

@Override
protected void processPath(String repoUrl, SemanticAssetPath path) {
pathProcessor.process(repoUrl, path);
protected HarvesterStatsHolder processPath(String repoUrl, SemanticAssetPath path) {
return pathProcessor.process(repoUrl, path);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
import it.gov.innovazione.ndc.eventhandler.event.ConfigService;
import it.gov.innovazione.ndc.harvester.AgencyRepositoryService;
import it.gov.innovazione.ndc.harvester.SemanticAssetType;
import it.gov.innovazione.ndc.harvester.model.HarvesterStatsHolder;
import it.gov.innovazione.ndc.harvester.model.SemanticAssetPath;
import it.gov.innovazione.ndc.harvester.pathprocessors.SchemaPathProcessor;
import it.gov.innovazione.ndc.harvester.service.SemanticContentStatsService;
import org.springframework.stereotype.Component;

import java.nio.file.Path;
Expand All @@ -16,15 +18,19 @@ public class SchemaHarvester extends BaseSemanticAssetHarvester<SemanticAssetPat
private final AgencyRepositoryService agencyRepositoryService;
private final SchemaPathProcessor pathProcessor;

public SchemaHarvester(AgencyRepositoryService agencyRepositoryService, SchemaPathProcessor pathProcessor, NdcEventPublisher ndcEventPublisher, ConfigService configService) {
super(SemanticAssetType.SCHEMA, ndcEventPublisher, configService);
public SchemaHarvester(AgencyRepositoryService agencyRepositoryService,
SchemaPathProcessor pathProcessor,
NdcEventPublisher ndcEventPublisher,
ConfigService configService,
SemanticContentStatsService semanticContentStatsService) {
super(SemanticAssetType.SCHEMA, ndcEventPublisher, configService, semanticContentStatsService);
this.agencyRepositoryService = agencyRepositoryService;
this.pathProcessor = pathProcessor;
}

@Override
protected void processPath(String repoUrl, SemanticAssetPath path) {
pathProcessor.process(repoUrl, path);
protected HarvesterStatsHolder processPath(String repoUrl, SemanticAssetPath path) {
return pathProcessor.process(repoUrl, path);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package it.gov.innovazione.ndc.harvester.model;

import it.gov.innovazione.ndc.harvester.model.index.SemanticAssetMetadata;
import lombok.Builder;
import lombok.Data;

@Data
@Builder
public class HarvesterStatsHolder {
private final SemanticAssetMetadata metadata;
private final SemanticAssetModelValidationContext.ValidationContextStats validationContextStats;
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,7 @@ public interface SemanticAssetModel {
Model getRdfModel();

SemanticAssetMetadata extractMetadata();

SemanticAssetModelValidationContext getValidationContext();
}

Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ public static SemanticAssetModelValidationContext getForValidation() {
return builder().build();
}

public static ValidationContextStats difference(ValidationContextStats contextBefore, ValidationContextStats contextAfter) {
return ValidationContextStats.builder()
.errors(contextAfter.getErrors() - contextBefore.getErrors())
.warnings(contextAfter.getWarnings() - contextBefore.getWarnings())
.build();
}

public SemanticAssetModelValidationContext withFieldName(String fieldName) {
return toBuilder().fieldName(fieldName).build();
}
Expand Down Expand Up @@ -107,4 +114,25 @@ public void setWarningValidationType() {
private enum ValidationContextType {
ERROR, WARNING
}

@Data
@Builder
public static class ValidationContextStats {
private final int errors;
private final int warnings;

public static ValidationContextStats of(SemanticAssetModelValidationContext validationContext) {
return ValidationContextStats.builder()
.errors(validationContext.getErrors().size())
.warnings(validationContext.getWarnings().size())
.build();
}

public static ValidationContextStats empty() {
return ValidationContextStats.builder()
.errors(0)
.warnings(0)
.build();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import it.gov.innovazione.ndc.harvester.context.HarvestExecutionContext;
import it.gov.innovazione.ndc.harvester.context.HarvestExecutionContextUtils;
import it.gov.innovazione.ndc.harvester.exception.SinglePathProcessingException;
import it.gov.innovazione.ndc.harvester.model.HarvesterStatsHolder;
import it.gov.innovazione.ndc.harvester.model.SemanticAssetModel;
import it.gov.innovazione.ndc.harvester.model.SemanticAssetModelValidationContext;
import it.gov.innovazione.ndc.harvester.model.SemanticAssetPath;
import it.gov.innovazione.ndc.harvester.model.extractors.RightsHolderExtractor;
import it.gov.innovazione.ndc.harvester.model.index.NodeSummary;
Expand Down Expand Up @@ -43,12 +45,25 @@ private static <M extends SemanticAssetModel> SemanticAssetMetadata tryExtractMe
}
}

protected void processWithModel(String repoUrl, P path, M model) {
protected HarvesterStatsHolder processWithModel(String repoUrl, P path, M model) {
log.debug("Enriching model before persisting");
enrichModelBeforePersisting(model, path);
indexMetadataForSearch(model);
SemanticAssetModelValidationContext.ValidationContextStats statsBefore = getStats(model);
SemanticAssetMetadata meta = indexMetadataForSearch(model);
SemanticAssetModelValidationContext.ValidationContextStats statsAfter = getStats(model);
persistModelToTripleStore(repoUrl, path, model);
collectRightsHolderInContext(repoUrl, model);
return HarvesterStatsHolder.builder()
.metadata(meta)
.validationContextStats(SemanticAssetModelValidationContext.difference(statsBefore, statsAfter))
.build();
}

private static <M extends SemanticAssetModel> SemanticAssetModelValidationContext.ValidationContextStats getStats(M model) {
return Optional.ofNullable(model)
.map(SemanticAssetModel::getValidationContext)
.map(SemanticAssetModelValidationContext.ValidationContextStats::of)
.orElse(SemanticAssetModelValidationContext.ValidationContextStats.empty());
}

private void collectRightsHolderInContext(String repoUrl, M model) {
Expand Down Expand Up @@ -79,7 +94,7 @@ protected void enrichModelBeforePersisting(M model, P path) {
}

@Override
public void process(String repoUrl, P path) {
public HarvesterStatsHolder process(String repoUrl, P path) {
try {
log.info("Processing path {}", path);

Expand All @@ -90,8 +105,10 @@ public void process(String repoUrl, P path) {
Resource resource = model.getMainResource();
log.info("Found resource {}", resource);

processWithModel(repoUrl, path, model);
HarvesterStatsHolder harvesterStatsHolder = processWithModel(repoUrl, path, model);
log.info("Path {} processed", path);

return harvesterStatsHolder;
} catch (Exception e) {
log.error("Error processing {}", path, e);
if (e instanceof SinglePathProcessingException singlePathProcessingException) {
Expand All @@ -113,7 +130,7 @@ public void process(String repoUrl, P path) {
}
}

private void indexMetadataForSearch(M model) {
private SemanticAssetMetadata indexMetadataForSearch(M model) {
log.debug("Indexing {} for search", model.getMainResource());
SemanticAssetMetadata metadata = tryExtractMetadata(model);
postProcessMetadata(metadata);
Expand All @@ -125,6 +142,7 @@ private void indexMetadataForSearch(M model) {
.harvesterStatus(HarvesterRun.Status.RUNNING)
.additionalInfo("metadata", metadata)
.build());
return metadata;
} catch (Exception e) {
log.error("Error saving metadata for {}", model.getMainResource(), e);
throw new SinglePathProcessingException("Cannot save metadata", e, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import it.gov.innovazione.ndc.harvester.csv.CsvParser.CsvData;
import it.gov.innovazione.ndc.harvester.model.ControlledVocabularyModel;
import it.gov.innovazione.ndc.harvester.model.CvPath;
import it.gov.innovazione.ndc.harvester.model.HarvesterStatsHolder;
import it.gov.innovazione.ndc.harvester.model.Instance;
import it.gov.innovazione.ndc.harvester.model.SemanticAssetModelFactory;
import it.gov.innovazione.ndc.harvester.model.index.SemanticAssetMetadata;
Expand Down Expand Up @@ -44,8 +45,8 @@ public ControlledVocabularyPathProcessor(TripleStoreRepository tripleStoreReposi
}

@Override
protected void processWithModel(String repoUrl, CvPath path, ControlledVocabularyModel model) {
super.processWithModel(repoUrl, path, model);
protected HarvesterStatsHolder processWithModel(String repoUrl, CvPath path, ControlledVocabularyModel model) {
HarvesterStatsHolder harvesterStatsHolder = super.processWithModel(repoUrl, path, model);

path.getCsvPath().ifPresent(p -> {
String keyConcept = model.getKeyConcept();
Expand All @@ -64,6 +65,8 @@ protected void processWithModel(String repoUrl, CvPath path, ControlledVocabular

parseAndIndexCsv(vocabularyIdentifier, p);
});

return harvesterStatsHolder;
}

@Override
Expand Down
Loading

0 comments on commit 75a1939

Please sign in to comment.