Skip to content

Commit

Permalink
Implement validation and support for json-key-name diff than target c…
Browse files Browse the repository at this point in the history
…olumn-name
  • Loading branch information
pravinbhat committed Aug 22, 2024
1 parent 93dd143 commit 3e1ca92
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ protected BoundStatement bind(Row originRow, Row targetRow, Integer ttl, Long wr
bindValue = explodeMapValue;
} else if (targetIndex == extractJsonFeature.getTargetColumnIndex()) {
int originIndex = extractJsonFeature.getOriginColumnIndex();
bindValue = extractJsonFeature.extract(originRow.getString(originIndex), extractJsonFeature.getTargetColumnName());
bindValue = extractJsonFeature.extract(originRow.getString(originIndex));
} else {
int originIndex = cqlTable.getCorrespondingIndex(targetIndex);
if (originIndex < 0) // we don't have data to bind for this column; continue to the next targetIndex
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,11 @@ protected BoundStatement bind(Row originRow, Row targetRow, Integer ttl, Long wr
}
else if (targetIndex== explodeMapKeyIndex) {
bindValueTarget = explodeMapKey;
}
else if (targetIndex== explodeMapValueIndex) {
} else if (targetIndex== explodeMapValueIndex) {
bindValueTarget = explodeMapValue;
} else if (targetIndex == extractJsonFeature.getTargetColumnIndex()) {
originIndex = extractJsonFeature.getOriginColumnIndex();
bindValueTarget = extractJsonFeature.extract(originRow.getString(originIndex));
} else {
if (originIndex < 0)
// we don't have data to bind for this column; continue to the next targetIndex
Expand All @@ -89,7 +91,7 @@ else if (targetIndex== explodeMapValueIndex) {
logger.error("Error trying to bind value:" + bindValueTarget + " to column:" +
targetColumnNames.get(targetIndex) + " of targetDataType:" + targetColumnTypes.get(targetIndex) + "/"
+ cqlTable.getBindClass(targetIndex).getName() + " at column index:" + targetIndex);
throw e;
throw new RuntimeException("Error trying to bind value: ", e);
}
}

Expand Down
63 changes: 26 additions & 37 deletions src/main/java/com/datastax/cdm/feature/ExtractJson.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package com.datastax.cdm.feature;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand All @@ -36,6 +35,7 @@ public class ExtractJson extends AbstractFeature {
private ObjectMapper mapper = new ObjectMapper();

private String originColumnName = "";
private String originJsonFieldName = "";
private Integer originColumnIndex = -1;

private String targetColumnName = "";
Expand All @@ -47,39 +47,43 @@ public boolean loadProperties(IPropertyHelper helper) {
throw new IllegalArgumentException("helper is null");
}

this.originColumnName = getColumnName(helper, KnownProperties.EXTRACT_JSON_ORIGIN_COLUMN_NAME);
this.targetColumnName = getColumnName(helper, KnownProperties.EXTRACT_JSON_TARGET_COLUMN_NAME);
logger.warn("Origin column name ({}) ", originColumnName);
logger.warn("Target column name ({}) ", targetColumnName);
originColumnName = getColumnName(helper, KnownProperties.EXTRACT_JSON_ORIGIN_COLUMN_NAME);
targetColumnName = getColumnName(helper, KnownProperties.EXTRACT_JSON_TARGET_COLUMN_NAME);
// Convert columnToFieldMapping to targetColumnName and originJsonFieldName
if (!targetColumnName.isBlank()) {
String[] parts = targetColumnName.split("\\:");
if (parts.length == 2) {
targetColumnName = parts[0];
originJsonFieldName = parts[1];
} else {
originJsonFieldName = targetColumnName;
}
}

isValid = validateProperties();
logger.warn("ExtractJson valid ({}) ", isValid);

isEnabled = isValid && !originColumnName.isEmpty() && !targetColumnName.isEmpty();
logger.warn("ExtractJson isEnabled ({}) ", isEnabled);

isLoaded = true;

return isLoaded && isValid;
}

@Override
protected boolean validateProperties() {
isValid = true;
if ((null == originColumnName || originColumnName.isEmpty())
&& (null == targetColumnName || targetColumnName.isEmpty()))
return true;

if (null == originColumnName || originColumnName.isEmpty()) {
logger.error("Origin column name is not set when Target ({}) are set", targetColumnName);
isValid = false;
return false;
}

if (null == targetColumnName || targetColumnName.isEmpty()) {
logger.error("Target column name is not set when Origin ({}) are set", originColumnName);
isValid = false;
return false;
}

return isValid;
return true;
}

@Override
Expand All @@ -94,7 +98,6 @@ public boolean initializeAndValidate(CqlTable originTable, CqlTable targetTable)
throw new IllegalArgumentException("Target table is not a target table");
}

isValid = true;
if (!validateProperties()) {
isEnabled = false;
return false;
Expand All @@ -105,39 +108,28 @@ public boolean initializeAndValidate(CqlTable originTable, CqlTable targetTable)
// Initialize Origin variables
List<Class> originBindClasses = originTable.extendColumns(Collections.singletonList(originColumnName));
if (null == originBindClasses || originBindClasses.size() != 1 || null == originBindClasses.get(0)) {
logger.error("Origin column {} is not found on the origin table {}", originColumnName,
originTable.getKeyspaceTable());
isValid = false;
throw new IllegalArgumentException("Origin column " + originColumnName
+ " is not found on the origin table " + originTable.getKeyspaceTable());
} else {
this.originColumnIndex = originTable.indexOf(originColumnName);
}

// Initialize Target variables
List<Class> targetBindClasses = targetTable.extendColumns(Arrays.asList(targetColumnName));
List<Class> targetBindClasses = targetTable.extendColumns(Collections.singletonList(targetColumnName));
if (null == targetBindClasses || targetBindClasses.size() != 1 || null == targetBindClasses.get(0)) {
logger.error("Target column {} is not found on the target table {}", targetColumnName,
targetTable.getKeyspaceTable());
isValid = false;
throw new IllegalArgumentException("Target column " + targetColumnName
+ " is not found on the target table " + targetTable.getKeyspaceTable());
} else {
this.targetColumnIndex = targetTable.indexOf(targetColumnName);
}
logger.warn("ExtractJson originColumnIndex ({}) ", originColumnIndex);
logger.warn("ExtractJson targetColumnIndex ({}) ", targetColumnIndex);

if (isEnabled && logger.isTraceEnabled()) {
logger.trace("Origin column {} is at index {}", originColumnName, originColumnIndex);
logger.trace("Target column {} is at index {}", targetColumnName, targetColumnIndex);
}

if (!isValid)
isEnabled = false;
logger.info("Feature {} is {}", this.getClass().getSimpleName(), isEnabled ? "enabled" : "disabled");
return isValid;
return true;
}

public Object extract(String jsonString, String field) throws JsonMappingException, JsonProcessingException {
public Object extract(String jsonString) throws JsonMappingException, JsonProcessingException {
if (StringUtils.isNotBlank(jsonString)) {
return mapper.readValue(jsonString, Map.class).get(field);
return mapper.readValue(jsonString, Map.class).get(originJsonFieldName);
}

return null;
Expand All @@ -155,10 +147,7 @@ public String getTargetColumnName() {
return isEnabled ? targetColumnName : "";
}

public static String getColumnName(IPropertyHelper helper, String colName) {
if (null == helper) {
throw new IllegalArgumentException("helper is null");
}
private String getColumnName(IPropertyHelper helper, String colName) {
String columnName = CqlTable.unFormatName(helper.getString(colName));
return (null == columnName) ? "" : columnName;
}
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/com/datastax/cdm/job/DiffJobSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import com.datastax.cdm.data.Record;
import com.datastax.cdm.feature.ConstantColumns;
import com.datastax.cdm.feature.ExplodeMap;
import com.datastax.cdm.feature.ExtractJson;
import com.datastax.cdm.feature.Featureset;
import com.datastax.cdm.feature.Guardrail;
import com.datastax.cdm.feature.TrackRun;
Expand All @@ -64,6 +65,7 @@ public class DiffJobSession extends CopyJobSession {
public Logger logger = LoggerFactory.getLogger(this.getClass().getName());
boolean logDebug = logger.isDebugEnabled();
boolean logTrace = logger.isTraceEnabled();
private ExtractJson extractJsonFeature;

public DiffJobSession(CqlSession originSession, CqlSession targetSession, SparkConf sc) {
super(originSession, targetSession, sc);
Expand Down Expand Up @@ -108,6 +110,8 @@ public DiffJobSession(CqlSession originSession, CqlSession targetSession, SparkC
this.explodeMapValueIndex = -1;
}

extractJsonFeature = (ExtractJson) this.targetSession.getCqlTable().getFeature(Featureset.EXTRACT_JSON);

logger.info("CQL -- origin select: {}", this.originSession.getOriginSelectByPartitionRangeStatement().getCQL());
logger.info("CQL -- target select: {}", this.targetSession.getTargetSelectByPKStatement().getCQL());
logger.info("CQL -- target upsert: {}", this.targetSession.getTargetUpsertStatement().getCQL());
Expand Down Expand Up @@ -285,6 +289,9 @@ private String isDifferent(Record record) {
if (logTrace)
logger.trace("PK {}, targetIndex {} column {} using explodeMapValue stored on PK: {}", pk,
targetIndex, targetColumnNames.get(targetIndex), origin);
} else if (targetIndex == extractJsonFeature.getTargetColumnIndex()) {
originIndex = extractJsonFeature.getOriginColumnIndex();
origin = extractJsonFeature.extract(originRow.getString(originIndex));
} else {
throw new RuntimeException(
"Target column \"" + targetColumnNames.get(targetIndex) + "\" at index " + targetIndex
Expand All @@ -308,6 +315,9 @@ private String isDifferent(Record record) {
diffData.append("Target column:").append(targetColumnNames.get(targetIndex)).append("-origin[")
.append(originContent).append("]").append("-target[").append(targetContent)
.append("]; ");
} else if (null == origin && null != targetAsOriginType) {
diffData.append("Target column:").append(targetColumnNames.get(targetIndex))
.append(" origin is null, target is ").append(targetAsOriginType).append("; ");
}
} catch (Exception e) {
String exceptionName;
Expand Down
22 changes: 12 additions & 10 deletions src/resources/cdm-detailed.properties
Original file line number Diff line number Diff line change
Expand Up @@ -368,20 +368,22 @@ spark.cdm.perfops.ratelimit.target 20000


#===========================================================================================================
# Extract Json Feature allows you to extract a field from Json content in an Origin table and map it to
# a column in the Target table
# Extract Json Feature allows you to extract a json value from a column with json content in an Origin table
# and map it to a column in the Target table
#
# spark.cdm.feature.extractJson
# .origin.name Name of the origin column with json content
# .target.name Name of the target column as well as the key-name within the json content. Note that
# the target column name must be the same as the key-name within the json content that
# you want to extract. The json column in origin table must be of CQL type 'text' while
# the target column can be of any JSON primitive types (String, Number or Boolean) but
# must match the json value that you want to extract. If the json key-name does not exist
# in the json content, the target column will be set to null.
# .target.name Name of the target column where the extracted json value will be added. The json
# column in origin table must be of CQL type 'text' while the column in the target table
# can be of any primitive JSON types (String, Number or Boolean) but must match the json
# value that you want to extract. If the json key-name does not exist in the json content,
# the target column will be set to null.
# Note: The target column-name must be the same as the key-name within the json content
# that you want to extract. If the json key-name is not the same as the target column-name,
# use the format target_columnname:json_keyname
#-----------------------------------------------------------------------------------------------------------
#spark.cdm.feature.extractJson.origin.name content
#spark.cdm.feature.extractJson.target.name name
#spark.cdm.feature.extractJson.origin.name origin_columnname_with_json_content
#spark.cdm.feature.extractJson.target.name target_columnname

#===========================================================================================================
# Guardrail feature manages records that exceed guardrail checks. The Guardrail job will generate a
Expand Down
101 changes: 95 additions & 6 deletions src/test/java/com/datastax/cdm/feature/ExtractJsonTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
import org.junit.jupiter.api.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;

import static org.apache.hadoop.shaded.com.google.common.base.CharMatcher.any;
import static org.junit.jupiter.api.Assertions.*;
Expand Down Expand Up @@ -89,21 +90,39 @@ public void setup() {
}

@Test
public void smokeTest_loadProperties() {
public void loadProperties() {
boolean loaded = feature.loadProperties(propertyHelper);

assertAll(
() -> assertTrue(loaded, "properties are loaded and valid"),
() -> assertTrue(feature.isEnabled()),
() -> assertEquals(standardTargetName, feature.getTargetColumnName(), "age")
() -> assertEquals(standardTargetName, feature.getTargetColumnName())
);
}

@Test
public void loadPropertiesException() {
IllegalArgumentException thrown = assertThrows(IllegalArgumentException.class, () -> feature.loadProperties(null));
assertTrue(thrown.getMessage().contains("helper is null"));
}

@Test
public void smokeTest_initializeAndValidate() {
public void loadPropertiesOriginError() {
when(propertyHelper.getString(KnownProperties.EXTRACT_JSON_ORIGIN_COLUMN_NAME)).thenReturn(null);
assertFalse(feature.loadProperties(propertyHelper), "Origin column name is not set");
}

@Test
public void loadPropertiesTargetError() {
when(propertyHelper.getString(KnownProperties.EXTRACT_JSON_TARGET_COLUMN_NAME)).thenReturn(null);
assertFalse(feature.loadProperties(propertyHelper), "Target column name is not set");
}

@Test
public void initializeAndValidate() {
feature.loadProperties(propertyHelper);

boolean valid = feature.initializeAndValidate(originTable, targetTable);

assertAll(
() -> assertTrue(valid, "configuration is valid"),
() -> assertEquals(standardOriginNames.indexOf(standardOriginName), feature.getOriginColumnIndex(), "origin index"),
Expand All @@ -112,7 +131,17 @@ public void smokeTest_initializeAndValidate() {
}

@Test
public void smokeTest_disabledFeature() {
public void extractNull() throws JsonMappingException, JsonProcessingException{
feature.loadProperties(propertyHelper);
boolean valid = feature.initializeAndValidate(originTable, targetTable);

assertNull(feature.extract("{\"name\":\"Pravin\"}"));
assertNull(feature.extract("{}"));
assertNull(feature.extract(""));
}

@Test
public void disabledFeature() {
when(propertyHelper.getString(KnownProperties.EXTRACT_JSON_ORIGIN_COLUMN_NAME)).thenReturn("");
when(propertyHelper.getString(KnownProperties.EXTRACT_JSON_TARGET_COLUMN_NAME)).thenReturn("");

Expand All @@ -125,6 +154,66 @@ public void smokeTest_disabledFeature() {
() -> assertEquals(-1, feature.getTargetColumnIndex(), "target index"),
() -> assertEquals(-1, feature.getOriginColumnIndex(), "origin index")
);

when(propertyHelper.getString(KnownProperties.EXTRACT_JSON_TARGET_COLUMN_NAME)).thenReturn(null);
assertEquals("", feature.getTargetColumnName(), "target name");
}

@Test
public void initializeAndValidateExceptionOriginNull() {
IllegalArgumentException thrown = assertThrows(IllegalArgumentException.class, () -> feature.initializeAndValidate(null, targetTable));
assertTrue(thrown.getMessage().contains("originTable and/or targetTable is null"));
}

@Test
public void initializeAndValidateExceptionTargetNull() {
IllegalArgumentException thrown = assertThrows(IllegalArgumentException.class, () -> feature.initializeAndValidate(originTable, null));
assertTrue(thrown.getMessage().contains("originTable and/or targetTable is null"));
}

@Test
public void initializeAndValidateExceptionOriginColumn() {
when(propertyHelper.getString(KnownProperties.EXTRACT_JSON_ORIGIN_COLUMN_NAME)).thenReturn("incorrect_column");

feature.loadProperties(propertyHelper);

IllegalArgumentException thrown = assertThrows(IllegalArgumentException.class, () -> feature.initializeAndValidate(originTable, targetTable));
}

@Test
public void initializeAndValidateExceptionTargetColumn() {
when(propertyHelper.getString(KnownProperties.EXTRACT_JSON_TARGET_COLUMN_NAME)).thenReturn("incorrect_column");

feature.loadProperties(propertyHelper);

IllegalArgumentException thrown = assertThrows(IllegalArgumentException.class, () -> feature.initializeAndValidate(originTable, targetTable));
}

@Test
public void initializeAndValidateExceptionOriginIncorrect() {
when(originTable.isOrigin()).thenReturn(false);

IllegalArgumentException thrown = assertThrows(IllegalArgumentException.class, () -> feature.initializeAndValidate(originTable, targetTable));
assertTrue(thrown.getMessage().contains("Origin table is not an origin table"));
}

@Test
public void initializeAndValidateExceptionTargetIncorrect() {
when(targetTable.isOrigin()).thenReturn(true);

IllegalArgumentException thrown = assertThrows(IllegalArgumentException.class, () -> feature.initializeAndValidate(originTable, targetTable));
assertTrue(thrown.getMessage().contains("Target table is not a target table"));
}

@Test
public void invalidFeature() {
when(propertyHelper.getString(KnownProperties.EXTRACT_JSON_ORIGIN_COLUMN_NAME)).thenReturn("");

assertAll(
() -> assertFalse(feature.loadProperties(propertyHelper), "loadProperties"),
() -> assertFalse(feature.initializeAndValidate(originTable, targetTable), "initializeAndValidate"),
() -> assertFalse(feature.isEnabled(), "feature should be disabled")
);
}

}
Expand Down

0 comments on commit 3e1ca92

Please sign in to comment.