Skip to content

Commit

Permalink
CNDE-1955: Service integration for HIV data mart post processing (#111)
Browse files Browse the repository at this point in the history
  • Loading branch information
johneipe authored Dec 13, 2024
1 parent b217ae6 commit dd39e01
Show file tree
Hide file tree
Showing 7 changed files with 161 additions and 56 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
CREATE OR ALTER PROCEDURE [dbo].[sp_hepatitis_datamart_postprocessing]
@phc_id nvarchar(max),
@pat_ids nvarchar(max) = '',
@debug bit = 'false'
AS
BEGIN
Expand Down Expand Up @@ -1054,7 +1053,7 @@ BEGIN

IF OBJECT_ID('#TMP_Investigation', 'U') IS NOT NULL
BEGIN
DROP TABLE TMP_Investigation;
DROP TABLE #TMP_Investigation;

END;

Expand Down Expand Up @@ -1213,7 +1212,7 @@ BEGIN

IF OBJECT_ID('#TMP_HEP_PAT_PROV', 'U') IS NOT NULL
BEGIN
DROP TABLE TMP_HEP_PAT_PROV;
DROP TABLE #TMP_HEP_PAT_PROV;

END;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@ public interface InvestigationRepository extends JpaRepository<InvestigationResu
void executeStoredProcForFPageCase(@Param("publicHealthCaseUids") String publicHealthCaseUids);

@Procedure("sp_hepatitis_datamart_postprocessing")
void executeStoredProcForHepDatamart(
@Param("publicHealthCaseUids") String publicHealthCaseUids,
@Param("patientUids") String patientUids);
void executeStoredProcForHepDatamart(@Param("publicHealthCaseUids") String publicHealthCaseUids);

@Procedure("sp_nrt_case_count_postprocessing")
void executeStoredProcForCaseCount(@Param("healthcaseUids") String healthcaseUids);
Expand All @@ -33,4 +31,7 @@ void executeStoredProcForHepDatamart(

@Procedure("sp_f_std_page_case_postprocessing")
void executeStoredProcForFStdPageCase(@Param("publicHealthCaseUids") String publicHealthCaseUids);

@Procedure("sp_std_hiv_datamart_postprocessing")
void executeStoredProcForStdHIVDatamart(@Param("publicHealthCaseUids") String publicHealthCaseUids);
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,24 +67,26 @@ enum Entity {
PATIENT(3, "patient", "patient_uid", "sp_nrt_patient_postprocessing"),
INVESTIGATION(4, "investigation", PHC_UID, "sp_nrt_investigation_postprocessing"),
NOTIFICATION(5, "notification", "notification_uid", "sp_nrt_notification_postprocessing"),
CASE_MANAGEMENT(6, "case_management", PHC_UID, "sp_nrt_case_management_postprocessing"),
INTERVIEW(7, "interview", "interview_uid", "sp_d_interview_postprocessing"),
INTERVIEW(6, "interview", "interview_uid", "sp_d_interview_postprocessing"),
CASE_MANAGEMENT(7, "case_management", PHC_UID, "sp_nrt_case_management_postprocessing"),
LDF_DATA(8, "ldf_data", "ldf_uid", "sp_nrt_ldf_postprocessing"),
OBSERVATION(9, "observation", "observation_uid", null),
F_PAGE_CASE(0, "fact page case", PHC_UID, "sp_f_page_case_postprocessing"),
CASE_ANSWERS(0, "case answers", PHC_UID, "sp_page_builder_postprocessing"),
CASE_COUNT(0, "case count", PHC_UID, "sp_nrt_case_count_postprocessing"),
F_STD_PAGE_CASE(0, "fact std page case", PHC_UID, "sp_f_std_page_case_postprocessing"),
HEPATITIS_DATAMART(0, "Hepatitis_Datamart", PHC_UID, "sp_hepatitis_datamart_postprocessing"),
STD_HIV_DATAMART(0, "Std_Hiv_Datamart", PHC_UID, "sp_std_hiv_datamart_postprocessing"),
UNKNOWN(-1, "unknown", "unknown_uid", "sp_nrt_unknown_postprocessing");

private final int priority;
private final String name;
private final String entityName;
private final String storedProcedure;
private final String uidName;

Entity(int priority, String name, String uidName, String storedProcedure) {
Entity(int priority, String entityName, String uidName, String storedProcedure) {
this.priority = priority;
this.name = name;
this.entityName = entityName;
this.storedProcedure = storedProcedure;
this.uidName = uidName;
}
Expand Down Expand Up @@ -248,14 +250,14 @@ protected void processCachedIds() {
case CASE_MANAGEMENT:
processTopic(keyTopic, entity, ids,
investigationRepository::executeStoredProcForCaseManagement);
processTopic(keyTopic, entity.getName(), ids,
processTopic(keyTopic, entity.getEntityName(), ids,
investigationRepository::executeStoredProcForFStdPageCase, "sp_f_std_page_case_postprocessing");
break;
case INTERVIEW:
processTopic(keyTopic, entity, ids,
postProcRepository::executeStoredProcForDInterview);
processTopic(keyTopic, Entity.F_STD_PAGE_CASE, ids,
postProcRepository::executeStoredProcForFInterviewCase);
processTopic(keyTopic, entity.getEntityName(), ids,
postProcRepository::executeStoredProcForFInterviewCase, "sp_f_interview_case_postprocessing");
break;
case LDF_DATA:
processTopic(keyTopic, entity, ids,
Expand All @@ -272,19 +274,19 @@ protected void processCachedIds() {
}

if (!morbIds.isEmpty()) {
processTopic(keyTopic, entity.getName(), morbIds,
processTopic(keyTopic, entity.getEntityName(), morbIds,
postProcRepository::executeStoredProcForMorbReport, "sp_d_morbidity_report_postprocessing");
}

if (!labIds.isEmpty()) {
processTopic(keyTopic, entity.getName(), labIds,
processTopic(keyTopic, entity.getEntityName(), labIds,
postProcRepository::executeStoredProcForLabTest, "sp_d_lab_test_postprocessing");
processTopic(keyTopic, entity.getName(), labIds,
processTopic(keyTopic, entity.getEntityName(), labIds,
postProcRepository::executeStoredProcForLabTestResult, "sp_d_labtest_result_postprocessing");

processTopic(keyTopic, entity.getName(), labIds,
processTopic(keyTopic, entity.getEntityName(), labIds,
postProcRepository::executeStoredProcForLab100Datamart, "sp_lab100_datamart_postprocessing");
processTopic(keyTopic, entity.getName(), labIds,
processTopic(keyTopic, entity.getEntityName(), labIds,
postProcRepository::executeStoredProcForLab101Datamart, "sp_lab101_datamart_postprocessing");
}
break;
Expand All @@ -297,8 +299,6 @@ protected void processCachedIds() {
} else {
logger.info("No ids to process from the topics.");
}


}

@Scheduled(fixedDelayString = "${service.fixed-delay.datamart}")
Expand All @@ -309,16 +309,21 @@ protected void processDatamartIds() {
Set<Map<Long, Long>> dmSet = entry.getValue();
dmCache.put(dmType, ConcurrentHashMap.newKeySet());

if (dmType.equals("Hepatitis_Datamart")) {
String cases =
dmSet.stream().flatMap(m -> m.keySet().stream().map(String::valueOf)).collect(Collectors.joining(","));
String patients =
dmSet.stream().flatMap(m -> m.values().stream().map(String::valueOf)).collect(Collectors.joining(","));
String cases =
dmSet.stream().flatMap(m -> m.keySet().stream().map(String::valueOf)).collect(Collectors.joining(","));

if (dmType.equals(Entity.HEPATITIS_DATAMART.getEntityName())) {

logger.info("Processing {} message topic. Calling stored proc: {} '{}'", dmType,
Entity.HEPATITIS_DATAMART.getStoredProcedure(), cases);
investigationRepository.executeStoredProcForHepDatamart(cases);
completeLog(Entity.HEPATITIS_DATAMART.getStoredProcedure());
} else if (dmType.equals(Entity.STD_HIV_DATAMART.getEntityName())) {

logger.info("Processing {} message topic. Calling stored proc: {} '{}','{}'", dmType,
"sp_hepatitis_datamart_postprocessing", cases, patients);
investigationRepository.executeStoredProcForHepDatamart(cases, patients);
completeLog("sp_hepatitis_datamart_postprocessing");
logger.info("Processing {} message topic. Calling stored proc: {} '{}'", dmType,
Entity.STD_HIV_DATAMART.getStoredProcedure(), cases);
investigationRepository.executeStoredProcForStdHIVDatamart(cases);
completeLog(Entity.STD_HIV_DATAMART.getStoredProcedure());
}
} else {
logger.info("No data to process from the datamart topics.");
Expand All @@ -334,12 +339,12 @@ public void shutdown() {

private String extractValFromMessage(String topic, String payload) {
try {
if (topic.endsWith(Entity.INVESTIGATION.getName())) {
if (topic.endsWith(Entity.INVESTIGATION.getEntityName())) {
JsonNode tblNode = objectMapper.readTree(payload).get(PAYLOAD).path("rdb_table_name_list");
if (!tblNode.isMissingNode() && !tblNode.isNull()) {
return tblNode.asText();
}
} else if (topic.endsWith(Entity.OBSERVATION.getName())) {
} else if (topic.endsWith(Entity.OBSERVATION.getEntityName())) {
String domainCd = objectMapper.readTree(payload).get(PAYLOAD).path("obs_domain_cd_st_1").asText();
String ctrlCd = Optional.ofNullable(objectMapper.readTree(payload).get(PAYLOAD).get("ctrl_cd_display_form"))
.filter(node -> !node.isNull()).map(JsonNode::asText).orElse(null);
Expand All @@ -366,13 +371,13 @@ private boolean assertMatches(String value, String... vals ) {
private Entity getEntityByTopic(String topic) {
return Arrays.stream(Entity.values())
.filter(entity -> entity.getPriority() > 0)
.filter(entity -> topic.endsWith(entity.getName()))
.filter(entity -> topic.endsWith(entity.getEntityName()))
.findFirst()
.orElse(Entity.UNKNOWN);
}

private void processTopic(String keyTopic, Entity entity, List<Long> ids, Consumer<String> repositoryMethod) {
processTopic(keyTopic, entity.getName(), ids, repositoryMethod, entity.getStoredProcedure());
processTopic(keyTopic, entity.getEntityName(), ids, repositoryMethod, entity.getStoredProcedure());
}

private void processTopic(String keyTopic, String name, List<Long> ids, Consumer<String> repositoryMethod, String spName) {
Expand All @@ -383,14 +388,14 @@ private void processTopic(String keyTopic, String name, List<Long> ids, Consumer

private <T> List<T> processTopic(String keyTopic, Entity entity, List<Long> ids,
Function<String, List<T>> repositoryMethod) {
String idsString = prepareAndLog(keyTopic, ids, entity.getName(), entity.getStoredProcedure());
String idsString = prepareAndLog(keyTopic, ids, entity.getEntityName(), entity.getStoredProcedure());
List<T> result = repositoryMethod.apply(idsString);
completeLog(entity.getStoredProcedure());
return result;
}

private void processTopic(String keyTopic, Entity entity, Long id, String vals, BiConsumer<Long, String> repositoryMethod) {
logger.info("Processing {} for topic: {}. Calling stored proc: {} '{}', '{}'", StringUtils.capitalize(entity.getName()), keyTopic,
logger.info("Processing {} for topic: {}. Calling stored proc: {} '{}', '{}'", StringUtils.capitalize(entity.getEntityName()), keyTopic,
entity.getStoredProcedure(), id, vals);
repositoryMethod.accept(id, vals);
completeLog(entity.getStoredProcedure());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import java.util.List;

import static gov.cdc.etldatapipeline.commonutil.TestUtils.readFileData;
import static gov.cdc.etldatapipeline.postprocessingservice.service.PostProcessingService.Entity.HEPATITIS_DATAMART;
import static gov.cdc.etldatapipeline.postprocessingservice.service.PostProcessingService.Entity.STD_HIV_DATAMART;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.verify;

Expand Down Expand Up @@ -55,16 +57,45 @@ void tearDown() throws Exception {
}

@Test
void testDatamartProcess() throws Exception {
void testHepDatamartProcess() throws Exception {
String topic = "dummy_investigation";
List<InvestigationResult> investigationResults = new ArrayList<>();
InvestigationResult invResult = getInvestigationResult(123L);
InvestigationResult invResult = getInvestigationResult(123L, HEPATITIS_DATAMART.getEntityName(), HEPATITIS_DATAMART.getStoredProcedure());
investigationResults.add(invResult);

datamartProcessor.datamartTopic = topic;
datamartProcessor.process(investigationResults);

Datamart datamart = getDatamart();
Datamart datamart = getDatamart("HepDatamart.json");
DatamartKey datamartKey = new DatamartKey();
datamartKey.setPublicHealthCaseUid(invResult.getPublicHealthCaseUid());

verify(kafkaTemplate).send(topicCaptor.capture(), keyCaptor.capture(), messageCaptor.capture());

String actualMessage = messageCaptor.getValue();
String actualKey = keyCaptor.getValue();

var actualReporting = objectMapper.readValue(
objectMapper.readTree(actualMessage).path(PAYLOAD).toString(), Datamart.class);
var actualDatamartKey = objectMapper.readValue(
objectMapper.readTree(actualKey).path(PAYLOAD).toString(), DatamartKey.class);

assertEquals(topic, topicCaptor.getValue());
assertEquals(datamartKey, actualDatamartKey);
assertEquals(datamart, actualReporting);
}

@Test
void testStdDatamartProcess() throws Exception {
String topic = "dummy_investigation";
List<InvestigationResult> investigationResults = new ArrayList<>();
InvestigationResult invResult = getInvestigationResult(123L, STD_HIV_DATAMART.getEntityName(), STD_HIV_DATAMART.getStoredProcedure());
investigationResults.add(invResult);

datamartProcessor.datamartTopic = topic;
datamartProcessor.process(investigationResults);

Datamart datamart = getDatamart("StdDatamart.json");
DatamartKey datamartKey = new DatamartKey();
datamartKey.setPublicHealthCaseUid(invResult.getPublicHealthCaseUid());

Expand All @@ -90,26 +121,28 @@ void testDatamartProcessNoExceptionWhenDataIsNull() {

@Test
void testDatamartProcessException() {
List<InvestigationResult> nullPhcResults = Collections.singletonList(getInvestigationResult(null));
List<InvestigationResult> nullPhcResults = Collections.singletonList(getInvestigationResult(null, HEPATITIS_DATAMART.getEntityName(), HEPATITIS_DATAMART.getStoredProcedure()));
assertThrows(RuntimeException.class, () -> datamartProcessor.process(nullPhcResults));
}

private InvestigationResult getInvestigationResult(Long phcUid) {
private InvestigationResult getInvestigationResult(Long phcUid, String entityName, String storedProcedure) {
InvestigationResult investigationResult = new InvestigationResult();
investigationResult.setPublicHealthCaseUid(phcUid);
investigationResult.setInvestigationKey(100L);
investigationResult.setPatientUid(456L);
investigationResult.setPatientKey(200L);
investigationResult.setConditionCd("10110");
investigationResult.setDatamart("Hepatitis_Datamart");
investigationResult.setStoredProcedure("sp_hepatitis_datamart_postprocessing");
investigationResult.setDatamart(entityName);
investigationResult.setStoredProcedure(storedProcedure);
return investigationResult;
}

private Datamart getDatamart() throws Exception {
String dmJson = readFileData(FILE_PREFIX + "Datamart.json");
JsonNode dmNode = objectMapper.readTree(dmJson);


private Datamart getDatamart(String jsonFile) throws Exception {
String dmJson = readFileData(FILE_PREFIX + jsonFile);
JsonNode dmNode = objectMapper.readTree(dmJson);
return objectMapper.readValue(dmNode.get(PAYLOAD).toString(), Datamart.class);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -408,16 +408,16 @@ void testPostProcessCacheIdsPriority() {
assertTrue(topicLogList.get(4).contains(invTopic));
assertTrue(topicLogList.get(5).contains(invTopic));
assertTrue(topicLogList.get(6).contains(ntfTopic));
assertTrue(topicLogList.get(7).contains(cmTopic));
assertTrue(topicLogList.get(8).contains(cmTopic));
assertTrue(topicLogList.get(9).contains(intTopic));
assertTrue(topicLogList.get(10).contains(intTopic));
assertTrue(topicLogList.get(7).contains(intTopic));
assertTrue(topicLogList.get(8).contains(intTopic));
assertTrue(topicLogList.get(9).contains(cmTopic));
assertTrue(topicLogList.get(10).contains(cmTopic));
assertTrue(topicLogList.get(11).contains(ldfTopic));
assertTrue(topicLogList.get(12).contains(obsTopic));
}

@Test
void testPostProcessDatamart() {
void testPostProcessHepatitsDatamart() {
String topic = "dummy_datamart";
String msg = "{\"payload\":{\"public_health_case_uid\":123,\"patient_uid\":456," +
"\"investigation_key\":100,\"patient_key\":200,\"condition_cd\":\"10110\"," +
Expand All @@ -426,8 +426,24 @@ void testPostProcessDatamart() {
postProcessingServiceMock.postProcessDatamart(topic, msg);
postProcessingServiceMock.processDatamartIds();

verify(investigationRepositoryMock).executeStoredProcForHepDatamart("123", "456");
assertTrue(postProcessingServiceMock.dmCache.containsKey("Hepatitis_Datamart"));
verify(investigationRepositoryMock).executeStoredProcForHepDatamart("123");
assertTrue(postProcessingServiceMock.dmCache.containsKey(PostProcessingService.Entity.HEPATITIS_DATAMART.getEntityName()));
List<ILoggingEvent> logs = listAppender.list;
assertEquals(3, logs.size());
}

@Test
void testPostProcessStdHIVDatamart() {
String topic = "dummy_datamart";
String msg = "{\"payload\":{\"public_health_case_uid\":123,\"patient_uid\":456," +
"\"investigation_key\":100,\"patient_key\":200,\"condition_cd\":\"10110\"," +
"\"datamart\":\"Std_Hiv_Datamart\",\"stored_procedure\":\"sp_std_hiv_datamart_postprocessing\"}}";

postProcessingServiceMock.postProcessDatamart(topic, msg);
postProcessingServiceMock.processDatamartIds();

verify(investigationRepositoryMock).executeStoredProcForStdHIVDatamart("123");
assertTrue(postProcessingServiceMock.dmCache.containsKey(PostProcessingService.Entity.STD_HIV_DATAMART.getEntityName()));
List<ILoggingEvent> logs = listAppender.list;
assertEquals(3, logs.size());
}
Expand Down Expand Up @@ -574,8 +590,8 @@ private List<InvestigationResult> getInvestigationResults(Long phcUid, Long pati
investigationResult.setPatientUid(456L);
investigationResult.setPatientKey(patientKey);
investigationResult.setConditionCd("10110");
investigationResult.setDatamart("Hepatitis_Datamart");
investigationResult.setStoredProcedure("sp_hepatitis_datamart_postprocessing");
investigationResult.setDatamart(PostProcessingService.Entity.HEPATITIS_DATAMART.getEntityName());
investigationResult.setStoredProcedure(PostProcessingService.Entity.HEPATITIS_DATAMART.getStoredProcedure());
investigationResults.add(investigationResult);
return investigationResults;
}
Expand Down
Loading

0 comments on commit dd39e01

Please sign in to comment.