From d0f97454fee335b823eaef5b2f8264cb045abbed Mon Sep 17 00:00:00 2001 From: cduby Date: Wed, 2 Aug 2023 12:47:57 -0400 Subject: [PATCH 01/19] Datahub 1.10.0 pom changes --- flink-cyber/caracal-parser/pom.xml | 11 ++- .../flink-commands/json-commands/pom.xml | 9 +++ flink-cyber/flink-common/pom.xml | 8 +++ flink-cyber/flink-cyber-api/pom.xml | 19 ++++++ flink-cyber/flink-dedupe/pom.xml | 4 ++ .../flink-enrichment-load/pom.xml | 15 ++--- .../flink-enrichment-lookup-hbase/pom.xml | 11 ++- .../flink-enrichment-lookup-raw/pom.xml | 5 ++ .../flink-enrichment-threatq/pom.xml | 12 ++-- flink-cyber/flink-hbase-common/pom.xml | 7 -- .../flink-indexing-hive/pom.xml | 5 ++ .../flink-indexing-parquet/pom.xml | 5 ++ flink-cyber/flink-profiler-java/pom.xml | 11 --- flink-cyber/flink-profiler/pom.xml | 4 ++ flink-cyber/flink-sessions/pom.xml | 5 ++ .../apache/metron/hbase/mock/MockHTable.java | 22 ++---- .../parser-chains-config-service/pom.xml | 5 ++ flink-cyber/metron-parser-chain/pom.xml | 1 - flink-cyber/pom.xml | 67 +++++++++++++++---- 19 files changed, 152 insertions(+), 74 deletions(-) diff --git a/flink-cyber/caracal-parser/pom.xml b/flink-cyber/caracal-parser/pom.xml index f8b9e6f7c..7277a2580 100644 --- a/flink-cyber/caracal-parser/pom.xml +++ b/flink-cyber/caracal-parser/pom.xml @@ -127,7 +127,11 @@ org.apache.flink flink-cloudera-registry - ${flink.version} + + + + com.hortonworks.registries + schema-registry-serdes @@ -164,6 +168,11 @@ flink-orc + + org.apache.orc + orc-core + + org.apache.parquet parquet-avro diff --git a/flink-cyber/flink-commands/json-commands/pom.xml b/flink-cyber/flink-commands/json-commands/pom.xml index 424ce8a42..69ae76848 100644 --- a/flink-cyber/flink-commands/json-commands/pom.xml +++ b/flink-cyber/flink-commands/json-commands/pom.xml @@ -69,6 +69,15 @@ ${log4j.version} provided + + com.fasterxml.jackson.core + jackson-databind + + + com.google.guava + guava + test + diff --git a/flink-cyber/flink-common/pom.xml b/flink-cyber/flink-common/pom.xml index 52401b821..7d3a0e0f8 100644 --- a/flink-cyber/flink-common/pom.xml +++ b/flink-cyber/flink-common/pom.xml @@ -43,10 +43,18 @@ org.apache.flink flink-connector-kafka + + org.apache.kafka + kafka-clients + org.apache.flink flink-cloudera-registry + + com.hortonworks.registries + schema-registry-serdes + org.apache.avro avro diff --git a/flink-cyber/flink-cyber-api/pom.xml b/flink-cyber/flink-cyber-api/pom.xml index f2548139e..5dddfd865 100644 --- a/flink-cyber/flink-cyber-api/pom.xml +++ b/flink-cyber/flink-cyber-api/pom.xml @@ -32,6 +32,19 @@ + + + + + org.springframework.boot + spring-boot-dependencies + ${spring-boot.version} + pom + import + + + + org.projectlombok @@ -55,6 +68,12 @@ flink-table-common + + + org.springframework + spring-core + + org.apache.avro avro diff --git a/flink-cyber/flink-dedupe/pom.xml b/flink-cyber/flink-dedupe/pom.xml index 4309f769e..940d9d650 100644 --- a/flink-cyber/flink-dedupe/pom.xml +++ b/flink-cyber/flink-dedupe/pom.xml @@ -47,6 +47,10 @@ org.apache.flink flink-cloudera-registry + + com.hortonworks.registries + schema-registry-serdes + org.projectlombok diff --git a/flink-cyber/flink-enrichment/flink-enrichment-load/pom.xml b/flink-cyber/flink-enrichment/flink-enrichment-load/pom.xml index 94d4fe0be..92feb896a 100644 --- a/flink-cyber/flink-enrichment/flink-enrichment-load/pom.xml +++ b/flink-cyber/flink-enrichment/flink-enrichment-load/pom.xml @@ -72,17 +72,6 @@ org.apache.flink flink-connector-hbase-${flink.hbase.version} - ${flink.version} - - - org.glassfish - javax.el - - - org.apache.hbase - hbase-client - - @@ -90,6 +79,10 @@ flink-cloudera-registry compile + + com.hortonworks.registries + schema-registry-serdes + com.cloudera.cyber diff --git a/flink-cyber/flink-enrichment/flink-enrichment-lookup-hbase/pom.xml b/flink-cyber/flink-enrichment/flink-enrichment-lookup-hbase/pom.xml index 0e6eb89a8..f8e3d108d 100644 --- a/flink-cyber/flink-enrichment/flink-enrichment-lookup-hbase/pom.xml +++ b/flink-cyber/flink-enrichment/flink-enrichment-lookup-hbase/pom.xml @@ -52,13 +52,6 @@ org.apache.flink flink-connector-hbase-${flink.hbase.version} - ${flink.version} - - - org.glassfish - javax.el - - @@ -67,6 +60,10 @@ provided + + com.hortonworks.registries + schema-registry-serdes + com.cloudera.cyber flink-cyber-api diff --git a/flink-cyber/flink-enrichment/flink-enrichment-lookup-raw/pom.xml b/flink-cyber/flink-enrichment/flink-enrichment-lookup-raw/pom.xml index 17b0965be..4128ddfb0 100644 --- a/flink-cyber/flink-enrichment/flink-enrichment-lookup-raw/pom.xml +++ b/flink-cyber/flink-enrichment/flink-enrichment-lookup-raw/pom.xml @@ -54,6 +54,11 @@ provided + + com.hortonworks.registries + schema-registry-serdes + + com.cloudera.cyber flink-cyber-api diff --git a/flink-cyber/flink-enrichment/flink-enrichment-threatq/pom.xml b/flink-cyber/flink-enrichment/flink-enrichment-threatq/pom.xml index 87db355a9..0ed169baf 100644 --- a/flink-cyber/flink-enrichment/flink-enrichment-threatq/pom.xml +++ b/flink-cyber/flink-enrichment/flink-enrichment-threatq/pom.xml @@ -47,13 +47,6 @@ org.apache.flink flink-connector-hbase-${flink.hbase.version} - ${flink.version} - - - org.glassfish - javax.el - - @@ -62,6 +55,11 @@ provided + + com.hortonworks.registries + schema-registry-serdes + + com.cloudera.cyber flink-cyber-api diff --git a/flink-cyber/flink-hbase-common/pom.xml b/flink-cyber/flink-hbase-common/pom.xml index 461776d8d..e8180bd16 100644 --- a/flink-cyber/flink-hbase-common/pom.xml +++ b/flink-cyber/flink-hbase-common/pom.xml @@ -31,13 +31,6 @@ org.apache.flink flink-connector-hbase-${flink.hbase.version} - ${flink.version} - - - org.glassfish - javax.el - - org.apache.hbase diff --git a/flink-cyber/flink-indexing/flink-indexing-hive/pom.xml b/flink-cyber/flink-indexing/flink-indexing-hive/pom.xml index 1332f4f07..f57913945 100644 --- a/flink-cyber/flink-indexing/flink-indexing-hive/pom.xml +++ b/flink-cyber/flink-indexing/flink-indexing-hive/pom.xml @@ -85,6 +85,11 @@ flink-cloudera-registry + + com.hortonworks.registries + schema-registry-serdes + + org.apache.flink flink-json diff --git a/flink-cyber/flink-indexing/flink-indexing-parquet/pom.xml b/flink-cyber/flink-indexing/flink-indexing-parquet/pom.xml index c2db275cb..43fef87dd 100644 --- a/flink-cyber/flink-indexing/flink-indexing-parquet/pom.xml +++ b/flink-cyber/flink-indexing/flink-indexing-parquet/pom.xml @@ -69,6 +69,11 @@ flink-orc + + org.apache.orc + orc-core + + org.apache.parquet parquet-avro diff --git a/flink-cyber/flink-profiler-java/pom.xml b/flink-cyber/flink-profiler-java/pom.xml index 424da0b37..0fa01edeb 100644 --- a/flink-cyber/flink-profiler-java/pom.xml +++ b/flink-cyber/flink-profiler-java/pom.xml @@ -36,17 +36,6 @@ org.apache.flink flink-connector-hbase-${flink.hbase.version} - ${flink.version} - - - org.glassfish - javax.el - - - org.apache.hbase - hbase-client - - com.cloudera.cyber diff --git a/flink-cyber/flink-profiler/pom.xml b/flink-cyber/flink-profiler/pom.xml index af0d296a9..2adb26368 100644 --- a/flink-cyber/flink-profiler/pom.xml +++ b/flink-cyber/flink-profiler/pom.xml @@ -53,6 +53,10 @@ ${assertj.version} test + + com.google.guava + guava + diff --git a/flink-cyber/flink-sessions/pom.xml b/flink-cyber/flink-sessions/pom.xml index 36cf88de1..358f64472 100644 --- a/flink-cyber/flink-sessions/pom.xml +++ b/flink-cyber/flink-sessions/pom.xml @@ -38,6 +38,11 @@ flink-cloudera-registry + + com.hortonworks.registries + schema-registry-serdes + + com.hortonworks.smm monitoring-interceptors diff --git a/flink-cyber/metron-hbase-common/src/test/java/org/apache/metron/hbase/mock/MockHTable.java b/flink-cyber/metron-hbase-common/src/test/java/org/apache/metron/hbase/mock/MockHTable.java index 05a1f03c8..6e17ae48f 100644 --- a/flink-cyber/metron-hbase-common/src/test/java/org/apache/metron/hbase/mock/MockHTable.java +++ b/flink-cyber/metron-hbase-common/src/test/java/org/apache/metron/hbase/mock/MockHTable.java @@ -45,20 +45,7 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Append; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Durability; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Increment; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Row; -import org.apache.hadoop.hbase.client.RowMutations; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.client.TableDescriptor; -import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.filter.CompareFilter; @@ -159,6 +146,11 @@ public TableDescriptor getDescriptor() throws IOException { .build(); } + @Override + public RegionLocator getRegionLocator() throws IOException { + return null; + } + @Override public boolean exists(Get get) throws IOException { if (get.getFamilyMap() == null || get.getFamilyMap().size() == 0) { @@ -598,7 +590,7 @@ public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, Compa } @Override - public void mutateRow(RowMutations rowMutations) throws IOException { + public Result mutateRow(RowMutations rowMutations) throws IOException { throw new UnsupportedOperationException(); } diff --git a/flink-cyber/metron-parser-chain/parser-chains-config-service/pom.xml b/flink-cyber/metron-parser-chain/parser-chains-config-service/pom.xml index 128bd0685..f1dc1d630 100644 --- a/flink-cyber/metron-parser-chain/parser-chains-config-service/pom.xml +++ b/flink-cyber/metron-parser-chain/parser-chains-config-service/pom.xml @@ -155,6 +155,11 @@ + + io.swagger + swagger-annotations + ${swagger-annotations.version} + diff --git a/flink-cyber/metron-parser-chain/pom.xml b/flink-cyber/metron-parser-chain/pom.xml index ff0597683..18486dcbe 100644 --- a/flink-cyber/metron-parser-chain/pom.xml +++ b/flink-cyber/metron-parser-chain/pom.xml @@ -56,7 +56,6 @@ 1.18.12 - 2.7.6 1.2.2 1.7.15 diff --git a/flink-cyber/pom.xml b/flink-cyber/pom.xml index ccecc9726..34afd393f 100644 --- a/flink-cyber/pom.xml +++ b/flink-cyber/pom.xml @@ -54,20 +54,20 @@ UTF-8 1.8 - 1.15.1-csa1.9.0.1 + 1.16.1-csadh1.10.0.0 2.4 - 7.1.7.0-551 + 7.2.17.0-334 1.18.22 1.18.16.0 21.0 - 1.10.0 - 2.5.0.${cdh.version} + 1.11.1 + 3.4.0.${cdh.version} 2.12 - 2.5.0.${cdh.version} + 3.4.0.${cdh.version} 1.11.4 2.11.2 2.10.1 - 2.1.0.${cdh.version} + 2.3.0.${cdh.version} 1.11.1 1.7.36 2.17.2 @@ -80,13 +80,13 @@ 8.4.1.${cdh.version} 2.13.1 3.1.1.${cdh.version} - 2.2.3.${cdh.version} + 2.4.6.${cdh.version} 3.6.0 1.3.0-incubating 3.13.2 1.1.1 5.9.3 - 5.1.0.${cdh.version} + 5.1.1.${cdh.version} 6.0.0.${cdh.version} 3.4 4.9.3 @@ -108,6 +108,9 @@ 1.10.0 0.1.9 1.14.2 + 1.5.1.${cdh.version} + 2.7.6 + 1.6.0 @@ -156,6 +159,12 @@ org.apache.flink flink-connector-kafka ${flink.version} + + + org.apache.kafka + kafka-clients + + @@ -210,6 +219,12 @@ flink-orc ${flink.version} provided + + + org.apache.orc + orc-core + + @@ -221,10 +236,21 @@ org.apache.flink - flink-connector-hbase + flink-connector-hbase-${flink.hbase.version} ${flink.version} + + + org.apache.hbase + hbase-client + + + + org.apache.orc + orc-core + ${orc.version} + com.github.erosb @@ -241,10 +267,22 @@ org.apache.avro avro + + org.apache.avro + avro + + + com.hortonworks.registries + schema-registry-serdes + org.everit.json org.everit.json.schema + + org.slf4j + log4j-over-slf4j + @@ -373,6 +411,12 @@ com.hortonworks.registries schema-registry-serdes 0.9.1 + + + org.slf4j + log4j-over-slf4j + + @@ -612,11 +656,6 @@ flink-connector-kafka ${flink.version} - - org.apache.flink - flink-cloudera-registry - ${flink.version} - From 054e548235678b14312dddbef8ea8935d5ab795d Mon Sep 17 00:00:00 2001 From: cduby Date: Thu, 10 Aug 2023 18:11:13 -0400 Subject: [PATCH 02/19] [CYB-172] kafka topic created from tableapi kafka connector need ssb properties --- .../flink-indexing-hive/pom.xml | 108 ++++++++++++++++++ .../indexing/hive/util/AvroSchemaUtil.java | 9 +- 2 files changed, 116 insertions(+), 1 deletion(-) diff --git a/flink-cyber/flink-indexing/flink-indexing-hive/pom.xml b/flink-cyber/flink-indexing/flink-indexing-hive/pom.xml index f57913945..a45a0726f 100644 --- a/flink-cyber/flink-indexing/flink-indexing-hive/pom.xml +++ b/flink-cyber/flink-indexing/flink-indexing-hive/pom.xml @@ -102,6 +102,14 @@ ${hive.version} compile + + org.codehaus.janino + commons-compiler + + + org.codehaus.janino + janino + org.apache.calcite * @@ -114,6 +122,106 @@ commons-cli commons-cli + + org.apache.hive + hive-vector-code-gen + + + org.apache.hive + hive-llap-tez + + + org.apache.hive + hive-shims + + + commons-codec + commons-codec + + + commons-httpclient + commons-httpclient + + + org.apache.logging.log4j + log4j-slf4j-impl + + + org.antlr + antlr-runtime + + + org.antlr + ST4 + + + org.apache.ant + ant + + + org.apache.commons + commons-compress + + + org.apache.ivy + ivy + + + org.apache.zookeeper + zookeeper + + + org.apache.curator + apache-curator + + + org.apache.curator + curator-framework + + + org.codehaus.groovy + groovy-all + + + org.apache.calcite + calcite-core + + + org.apache.calcite + calcite-druid + + + org.apache.calcite.avatica + avatica + + + org.apache.calcite + calcite-avatica + + + com.google.code.gson + gson + + + stax + stax-api + + + com.google.guava + guava + + + log4j + log4j + + + log4j + apache-log4j-extras + + + org.slf4j + slf4j-log4j12 + diff --git a/flink-cyber/flink-indexing/flink-indexing-hive/src/main/java/com/cloudera/cyber/indexing/hive/util/AvroSchemaUtil.java b/flink-cyber/flink-indexing/flink-indexing-hive/src/main/java/com/cloudera/cyber/indexing/hive/util/AvroSchemaUtil.java index 563d92288..85ca38c03 100644 --- a/flink-cyber/flink-indexing/flink-indexing-hive/src/main/java/com/cloudera/cyber/indexing/hive/util/AvroSchemaUtil.java +++ b/flink-cyber/flink-indexing/flink-indexing-hive/src/main/java/com/cloudera/cyber/indexing/hive/util/AvroSchemaUtil.java @@ -28,7 +28,10 @@ public static Schema convertToAvro(List tableColumnList) { //method that converts from flink Schema to avro Schema public static Schema convertToAvro(ResolvedSchema schema) { - SchemaBuilder.FieldAssembler fieldAssembler = SchemaBuilder.record("base").fields(); + SchemaBuilder.FieldAssembler fieldAssembler = SchemaBuilder.record("base") + .prop("ssb.rowtimeAttribute", "ts") + .prop("ssb.watermarkExpression", "`ts` - INTERVAL '30' SECOND") + .fields(); for (Column col : schema.getColumns()) { fieldAssembler = fieldAssembler.name(col.getName()).type().optional().type(AvroSchemaUtil.convertTypeToAvro(col.getName(), col.getDataType().getLogicalType())); @@ -43,7 +46,11 @@ public static void putRowIntoAvro(Row row, GenericRecord record, String fieldNam if (row == null) { value = null; } else { + try { value = convertToAvroObject(record.getSchema().getField(avroFieldName).schema(), row.getField(fieldName)); + } catch (Exception e) { + throw new RuntimeException(String.format("Error converting avro field %s", avroFieldName), e); + } } record.put(avroFieldName, value); System.out.println("fieldName: " + fieldName + " value: " + value); From eeb8f8218dcf1a5e1317ae943a9dcbf456027bb1 Mon Sep 17 00:00:00 2001 From: cduby Date: Wed, 4 Oct 2023 17:42:38 -0400 Subject: [PATCH 03/19] Datahub 1.10.0 pom changes --- flink-cyber/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-cyber/pom.xml b/flink-cyber/pom.xml index 34afd393f..d58122b06 100644 --- a/flink-cyber/pom.xml +++ b/flink-cyber/pom.xml @@ -54,7 +54,7 @@ UTF-8 1.8 - 1.16.1-csadh1.10.0.0 + 1.16.2-csadh1.10.0.100 2.4 7.2.17.0-334 1.18.22 From 28f8f9a5911b57b8796e510199b6442909a6d3f9 Mon Sep 17 00:00:00 2001 From: cduby Date: Wed, 31 Jan 2024 10:39:24 -0500 Subject: [PATCH 04/19] [CYB-190] user can specify names of CDP cloud artifacts in example create_datahub_config.sh setup script --- .../examples/setup/create_datahub_config.sh | 140 +++++++++++++----- 1 file changed, 101 insertions(+), 39 deletions(-) diff --git a/flink-cyber/cyber-jobs/src/main/resources/examples/setup/create_datahub_config.sh b/flink-cyber/cyber-jobs/src/main/resources/examples/setup/create_datahub_config.sh index b198b6c2d..903d0ab93 100755 --- a/flink-cyber/cyber-jobs/src/main/resources/examples/setup/create_datahub_config.sh +++ b/flink-cyber/cyber-jobs/src/main/resources/examples/setup/create_datahub_config.sh @@ -1,63 +1,125 @@ +#!/usr/bin/env bash + +report_usage_error() { + echo "$(basename $0) $1" >&2 + exit 2 +} + +report_fail() { + echo "$(basename $0) ERROR: $1" >&2 + exit 2 +} + +report_info() { + echo "INFO: $1" >&2 +} + if [[ $# -ne 2 ]]; then - echo "$(basename $0) " >&2 - exit 2 + report_usage_error " " fi -echo "When prompted, enter your workload user password." get_dh_name() { cdp datahub list-clusters | jq -r '.clusters[] | select(.clusterName | contains ("'"$1"'")) | select(.workloadType | contains ("'$2'")) | .clusterName ' } +# read_properties_into_variables +# Read the name value pairs +# split at first equals to key and value +# convert property name to legal shell variable name +# set shell variable to property variable +function read_properties_into_variables() { + while read -r line; do + [[ "$line" =~ ^([[:space:]]*|[[:space:]]*#.*)$ ]] && continue + value=${line#*=} + key=${line%"=$value"} + key=$(echo $key | tr '.' '_') + eval ${key}=${value} + done <$1 +} + env_name=$1 -cluster_prefix=$2 +properties_file=$2 + +if [[ -f "$properties_file" ]]; then + read_properties_into_variables "$properties_file" +else + report_fail "Properties file $properties_file can't be read. Check path and permissions." +fi + +echo "When prompted, enter your workload user password." + +found_env=$(cdp environments list-environments | jq -r '.environments[] | select(.environmentName=="'"${env_name}"'") | .environmentName') +if [[ -z "${found_env}" ]]; then + report_fail "Environment ${env_name} does not exist." +fi + config_dir=../pipelines workload_user=$(cdp iam get-user | jq -r '.user.workloadUsername') # discover hive configs -hive_dh=$(get_dh_name "${cluster_prefix}" "Hive") -if [[ ! -z "$hive_dh" ]]; then - hive_zip=$config_dir/hive-conf.zip - hive_conf="$config_dir/hive-conf" - hive_cm_api=$(cdp datahub describe-cluster --cluster-name "$hive_dh" | jq -r '.cluster.endpoints.endpoints[] | select (.serviceName | contains("CM-API")) | .serviceUrl') - echo resetting hive configs from datahub ${hive_dh} - rm -f "$hive_zip" - rm -rf "$hive_conf" - curl -S -s -o "${hive_zip}" -u "${workload_user}" ${hive_cm_api}/v41/clusters/${hive_dh}/services/hive_on_tez/clientConfig - if [[ -f "$hive_zip" ]]; then - tar -zxvf "$hive_zip" -C "$config_dir" - rm -f "$hive_conf/core-site.xml" - rm -f "$hive_conf/yarn-site.xml" +if [[ ! -z "${hive_datahub_name}" ]]; then + hive_dh=$(get_dh_name "${hive_datahub_name}" "Hive") + if [[ ! -z "$hive_dh" ]]; then + hive_zip=$config_dir/hive-conf.zip + hive_conf="$config_dir/hive-conf" + hive_cm_api=$(cdp datahub describe-cluster --cluster-name "$hive_dh" | jq -r '.cluster.endpoints.endpoints[] | select (.serviceName | contains("CM-API")) | .serviceUrl') + report_info "resetting hive configs from datahub ${hive_dh}" + rm -f "$hive_zip" + rm -rf "$hive_conf" + curl -S -s -o "${hive_zip}" -u "${workload_user}" ${hive_cm_api}/v41/clusters/${hive_dh}/services/hive_on_tez/clientConfig + if [[ -f "$hive_zip" ]]; then + tar -zxvf "$hive_zip" -C "$config_dir" + rm -f "$hive_conf/core-site.xml" + rm -f "$hive_conf/yarn-site.xml" + else + report_fail "Could not get hive configuration." + fi else - echo "ERROR: could not get hive configuration." - exit 2 + report_fail "Hive datahub '${hive_datahub_name}' not found in environment '${env_name}'" fi +else + report_info "Hive is not configured. Property hive_datahub_name not defined in properties file" fi -# discover kafka connection config -kafka_dh_name=$(get_dh_name "${cluster_prefix}" "Kafka") + + +# discover kafka connection config +if [[ ! -z "${kafka_datahub_name}" ]]; then + kafka_dh_name=$(get_dh_name "${kafka_datahub_name}" "Kafka") + if [[ -z "${kafka_dh_name}" ]]; then + report_fail "Environment '${env_name}' does not contain a datahub named '${kafka_datahub_name}' containing Kafka" + fi +else + report_fail "Kafka is not configured. Property kafka_datahub_name not defined in properties file" +fi schema_registry=$(cdp datahub describe-cluster --cluster-name ${kafka_dh_name} | jq -r '.cluster.instanceGroups[] | select(.name | contains("master")) | .instances[].fqdn') kafka_broker=$(cdp datahub describe-cluster --cluster-name ${kafka_dh_name} | jq -r '.cluster.endpoints.endpoints[] | select (.serviceName | contains("KAFKA_BROKER")) | .serviceUrl' | sed 's/ //g') # opdb (hbase and phoenix) connection config -opdb_cluster_name=$(cdp opdb list-databases --environment-name ${env_name} | jq -r '.databases[] | select(.databaseName | contains ("'"${cluster_prefix}"'")) | .databaseName') -phoenix_query_server_host=NO_OPDB_CLUSTER -if [[ ! -z "$opdb_cluster_name" ]]; then - echo resetting opdb configs from datahub ${opdb_cluster_name} - opdb_client_url=$(cdp opdb describe-client-connectivity --environment-name ${env_name} --database-name ${opdb_cluster_name} | jq -r '.connectors[] | select(.name=="hbase") | .configuration.clientConfigurationDetails[].url') - hbase_zip="$config_dir/hbase-config.zip" - hbase_conf="$config_dir/hbase-conf" - rm -f "$hbase_zip" - rm -rf "$hbase_conf" - curl -S -s -f -o "$hbase_zip" -u "${workload_user}" "${opdb_client_url}" - if [[ -f "$hbase_zip" ]]; then - tar -zxvf "$hbase_zip" -C "$config_dir" - else - echo "ERROR: could not get hbase configuration." - exit 2 +if [[ ! -z "${opdb_database_name}" ]]; then + opdb_cluster_name=$(cdp opdb list-databases --environment-name ${env_name} | jq -r '.databases[] | select(.databaseName | contains ("'"${opdb_database_name}"'")) | .databaseName') + phoenix_query_server_host=NO_OPDB_CLUSTER + if [[ ! -z "$opdb_cluster_name" ]]; then + report_info "Resetting OPDB configs from datahub ${opdb_cluster_name}" + opdb_client_url=$(cdp opdb describe-client-connectivity --environment-name ${env_name} --database-name ${opdb_cluster_name} | jq -r '.connectors[] | select(.name=="hbase") | .configuration.clientConfigurationDetails[].url') + hbase_zip="$config_dir/hbase-config.zip" + hbase_conf="$config_dir/hbase-conf" + rm -f "$hbase_zip" + rm -rf "$hbase_conf" + curl -S -s -f -o "$hbase_zip" -u "${workload_user}" "${opdb_client_url}" + if [[ -f "$hbase_zip" ]]; then + tar -zxvf "$hbase_zip" -C "$config_dir" + else + report_fail "Could not get HBase configuration." + fi + base_opdb_services_url=$(echo ${opdb_client_url} | sed -e 's/hbase\/clientConfig//') + report_info "getting Phoenix connection settings" + phoenix_query_server_host=$(curl -S -s -u ${workload_user} ${base_opdb_services_url}/phoenix/roles | jq -r '.items[] | select (.type | contains("PHOENIX_QUERY_SERVER")) | .hostRef.hostname') + else + report_fail "OPDB database ${hive_datahub_name} not found in environment ${env_name}" fi - base_opdb_services_url=$(echo ${opdb_client_url} | sed -e 's/hbase\/clientConfig//') - echo "getting phoenix connection settings" - phoenix_query_server_host=$(curl -S -s -u ${workload_user} ${base_opdb_services_url}/phoenix/roles | jq -r '.items[] | select (.type | contains("PHOENIX_QUERY_SERVER")) | .hostRef.hostname') +else + report_info "HBase and Phoenix are not configured. Property opdb_database_name not defined in properties file" >&2 fi cdp environments get-keytab --environment-name $env_name | jq -r '.contents' | base64 --decode > ${config_dir}/krb5.keytab From a41c542c893414abfe8eddb32f97df433148a27e Mon Sep 17 00:00:00 2001 From: cduby Date: Thu, 1 Feb 2024 12:07:01 -0500 Subject: [PATCH 05/19] [CYB-190] user can specify names of CDP cloud artifacts in example create_datahub_config.sh setup script --- .../src/main/resources/examples/README.md | 73 ++++++++++--------- 1 file changed, 40 insertions(+), 33 deletions(-) diff --git a/flink-cyber/cyber-jobs/src/main/resources/examples/README.md b/flink-cyber/cyber-jobs/src/main/resources/examples/README.md index d926d5a5b..9ddbe829d 100644 --- a/flink-cyber/cyber-jobs/src/main/resources/examples/README.md +++ b/flink-cyber/cyber-jobs/src/main/resources/examples/README.md @@ -43,7 +43,7 @@ Install the following services on the CDP Base Cluster. #### CDP Public Cloud -Provision the resources below in the same CDP Environment and use the same prefix at the begining of each resource name: +Provision the resources below in the same CDP Environment: | Resource Type | Configuration | Basic | Full | | -----------------| ------| ---- | ------| @@ -66,7 +66,7 @@ Provision the resources below in the same CDP Environment and use the same prefi CYBERSEC-2.3.1-1.16.1-csadh1.10.0.0-cdh7.2.17.0-334-2308141830/meta/ ... ``` -4. Create a link from CYBERSEC to the parcel directory. +4\. Create a link from CYBERSEC to the parcel directory. ```shell script [cduby@cduby-csa-081423-master0 ~]$ ln -s CYBERSEC-2.3.1-1.16.1-csadh1.10.0.0-cdh7.2.17.0-334-2308141830 CYBERSEC [cduby@cduby-csa-081423-master0 ~]$ ls -ld CYBERSEC @@ -74,18 +74,18 @@ lrwxrwxrwx. 1 cduby cduby 62 Aug 14 20:41 CYBERSEC -> CYBERSEC-2.3.1-1.16.1-csad [cduby@cduby-csa-081423-master0 ~]$ ls CYBERSEC bin etc jobs lib meta tools ``` -5. Edit the shell configuration defining the PATH variable. For example, edit .bash_profile. +5\. Edit the shell configuration defining the PATH variable. For example, edit .bash_profile. ```shell script ## The shell config file, maybe different. Locate the definition of PATH in your configs. [cduby@cduby-csa-081423-master0 ~]$ vi .bash_profile ``` -6. Add $HOME/CYBERSEC/bin to the PATH +6\. Add $HOME/CYBERSEC/bin to the PATH ```shell script ### this is an example, use your path here PATH=$PATH:$HOME/CYBERSEC/bin:$HOME/.local/bin:$HOME/bin export PATH ``` -7. Source the shell config or log out and log back in again to refresh the shell settings. Check the availability of the cybersec commands in the path. +7\. Source the shell config or log out and log back in again to refresh the shell settings. Check the availability of the cybersec commands in the path. ```shell script [cduby@cduby-csa-081423-master0 ~]$ source .bash_profile [cduby@cduby-csa-081423-master0 ~]$ which cs-restart-parser @@ -102,52 +102,59 @@ export PATH #### CDP Base 1. Copy the files in examples/setup/templates to example/pipelines - ``` cd cybersec/flink-cyber/ ``` 2. Edit the .properties files in example/pipelines with the correct settings for the cluster. -2. If the Hbase service is not in the same cluster as Flink, download the Hbase client configs from Cloudera Manager. Move the hbase config zip to the pipelines directory. Unzip the hbase configuration files. -3. If the Hive service is not in the same cluster as Flink, download the Hive on tez client configs from Cloudera Manager. Move the hive config zip to the pipelines directory. Unzip the hive config files. -4. If using a separate Hive cluster, remove the hive_conf/core-site.xml and hive-conf/yarn-site.xml files. +3. If the Hbase service is not in the same cluster as Flink, download the Hbase client configs from Cloudera Manager. Move the hbase config zip to the pipelines directory. Unzip the hbase configuration files. +4. If the Hive service is not in the same cluster as Flink, download the Hive on tez client configs from Cloudera Manager. Move the hive config zip to the pipelines directory. Unzip the hive config files. +5. If using a separate Hive cluster, remove the hive_conf/core-site.xml and hive-conf/yarn-site.xml files. #### CDP Public Cloud 1. If necessary, install the [CDP CLI client](https://docs.cloudera.com/cdp-public-cloud/cloud/cli/topics/mc-cli-client-setup.html). -2. Run the command line ./create_datahub_config.sh . When prompted enter your workload password. +2. [Install the jq package](https://jqlang.github.io/jq/download/). +3. Create a properties file with the names of the CDP cloud resources. +```shell script +hive_datahub_name=name_of_hive_datahub +kafka_datahub_name=name_of_kafka_datahub +opdb_database_name=name_of_operational_db +``` +Omit any lines for the hive datahubs or operational DB. The minimal properties files is shown below: +```shell script +kafka_datahub_name=name_of_kafka_datahub +``` +4\. Run the command line ./create_datahub_config.sh . When prompted enter your workload password. ```shell script cduby@cduby-MBP16-21649 examples % cd cybersec/flink-cyber/cyber-jobs/src/main/resources/examples/setup -cduby@cduby-MBP16-21649 setup % ./create_datahub_config.sh se-sandboxx-aws cduby -cleaning up hive configs +cduby@cduby-MBP16-21649 setup % ./create_datahub_config.sh se-sandboxx-aws datahub_setup_kafka_hive_opdb.properties +When prompted, enter your workload user password. +INFO: resetting hive configs from datahub de-cduby-013024 Enter host password for user 'cduby': - % Total % Received % Xferd Average Speed Time Time Time Current - Dload Upload Total Spent Left Speed -100 11102 0 11102 0 0 2870 0 --:--:-- 0:00:03 --:--:-- 2876 -x hive-conf/mapred-site.xml -x hive-conf/hdfs-site.xml -x hive-conf/hive-site.xml -x hive-conf/atlas-application.properties -x hive-conf/log4j.properties x hive-conf/hadoop-env.sh +x hive-conf/hdfs-site.xml x hive-conf/log4j2.properties -x hive-conf/redaction-rules.json -x hive-conf/core-site.xml +x hive-conf/beeline-site.xml x hive-conf/yarn-site.xml +x hive-conf/mapred-site.xml +x hive-conf/hive-site.xml +x hive-conf/atlas-application.properties x hive-conf/hive-env.sh -x hive-conf/beeline-site.xml +x hive-conf/core-site.xml +x hive-conf/redaction-rules.json +x hive-conf/log4j.properties +INFO: Resetting OPDB configs from datahub ckdodb-013024 Enter host password for user 'cduby': - % Total % Received % Xferd Average Speed Time Time Time Current - Dload Upload Total Spent Left Speed -100 5828 100 5828 0 0 5394 0 0:00:01 0:00:01 --:--:-- 5436 -x hbase-conf/hdfs-site.xml -x hbase-conf/atlas-application.properties x hbase-conf/hbase-omid-client-config.yml +x hbase-conf/hbase-site.xml +x hbase-conf/jaas.conf +x hbase-conf/hdfs-site.xml x hbase-conf/hbase-env.sh x hbase-conf/core-site.xml +x hbase-conf/atlas-application.properties x hbase-conf/log4j.properties -x hbase-conf/hbase-site.xml -x hbase-conf/jaas.conf +INFO: getting Phoenix connection settings +Enter host password for user 'cduby': Certificate was added to keystore -PRINCIPAL=cduby@SE-SANDB.A465-9Q4K.CLOUDERA.SITE ``` ### (Optional) Download Maxmind Database 1. Optionally download the binary (mmdb) version of the [Maxmind GeoLite2 City and ASN Databases](https://dev.maxmind.com/geoip/geolite2-free-geolocation-data). Create a Maxmind account login if you don't have one already. If you don't download the databases, the triaging job will operate but events will not have geocode (country, city, lat, lon) or asn (network) enrichments. @@ -155,7 +162,7 @@ PRINCIPAL=cduby@SE-SANDB.A465-9Q4K.CLOUDERA.SITE ```shell script cduby@cduby-MBP16-21649 templates % cp GeoLite2-*.tar.gz examples/setup ``` -3. Scp the examples directory tree to the flink gateway host. +3\. Scp the examples directory tree to the flink gateway host. ```shell script scp -r examples @:/home/ ``` @@ -193,7 +200,7 @@ no mock server running 100 138 100 138 0 0 874 0 --:--:-- --:--:-- --:--:-- 878 ``` -3. Start the basic pipeline. If you have all the required services installed, start the full pipeline. +3\. Start the basic pipeline. If you have all the required services installed, start the full pipeline. ```shell script ./start_basic.sh ## if all required services are installed From 2a262dd35c35e197eebf1697ed61e9c8fe208460 Mon Sep 17 00:00:00 2001 From: cduby Date: Thu, 1 Feb 2024 12:12:10 -0500 Subject: [PATCH 06/19] [CYB-191] schemas produced for topics can't be consumed by SMM because of watermark properties --- .../com/cloudera/cyber/scoring/ScoredMessage.java | 2 +- .../src/main/java/com/cloudera/cyber/Message.java | 3 ++- .../java/com/cloudera/cyber/avro/AvroSchemas.java | 14 +++++++++++--- 3 files changed, 14 insertions(+), 5 deletions(-) diff --git a/flink-cyber/flink-alert-scoring-api/src/main/java/com/cloudera/cyber/scoring/ScoredMessage.java b/flink-cyber/flink-alert-scoring-api/src/main/java/com/cloudera/cyber/scoring/ScoredMessage.java index f38eaab00..144aafabd 100644 --- a/flink-cyber/flink-alert-scoring-api/src/main/java/com/cloudera/cyber/scoring/ScoredMessage.java +++ b/flink-cyber/flink-alert-scoring-api/src/main/java/com/cloudera/cyber/scoring/ScoredMessage.java @@ -54,7 +54,7 @@ public long getTs() { return message.getTs(); } - public static final Schema SCHEMA$ = AvroSchemas.createRecordBuilder(ScoredMessage.class.getPackage().getName(), ScoredMessage.class.getName()) + public static final Schema SCHEMA$ = AvroSchemas.createRecordBuilder(ScoredMessage.class.getPackage().getName(), ScoredMessage.class.getName(), null) .fields() .name("message").type(Message.SCHEMA$).noDefault() .name("cyberScoresDetails").type(Schema.createArray(Scores.SCHEMA$)).noDefault() diff --git a/flink-cyber/flink-cyber-api/src/main/java/com/cloudera/cyber/Message.java b/flink-cyber/flink-cyber-api/src/main/java/com/cloudera/cyber/Message.java index 44e5dc8c1..ea7e61e16 100644 --- a/flink-cyber/flink-cyber-api/src/main/java/com/cloudera/cyber/Message.java +++ b/flink-cyber/flink-cyber-api/src/main/java/com/cloudera/cyber/Message.java @@ -18,6 +18,7 @@ import lombok.Data; import lombok.NoArgsConstructor; import lombok.NonNull; +import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; import org.apache.avro.specific.SpecificRecord; @@ -56,7 +57,7 @@ public class Message extends SpecificRecordBase implements SpecificRecord, Ident public static final Schema SCHEMA$ = AvroSchemas.createRecordBuilder(Message.class.getPackage().getName(), Message.class.getName()) .fields() .requiredString("id") - .requiredLong("ts") + .name("ts").type(LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG))).noDefault() .name("originalSource").type(SignedSourceKey.SCHEMA$).noDefault() .requiredString("message") .name("threats").type().optional().type(SchemaBuilder.map().values(SchemaBuilder.array().items(ThreatIntelligence.SCHEMA$))) diff --git a/flink-cyber/flink-cyber-api/src/main/java/com/cloudera/cyber/avro/AvroSchemas.java b/flink-cyber/flink-cyber-api/src/main/java/com/cloudera/cyber/avro/AvroSchemas.java index 9bec6f190..8309f15ac 100644 --- a/flink-cyber/flink-cyber-api/src/main/java/com/cloudera/cyber/avro/AvroSchemas.java +++ b/flink-cyber/flink-cyber-api/src/main/java/com/cloudera/cyber/avro/AvroSchemas.java @@ -5,8 +5,16 @@ public class AvroSchemas { public static SchemaBuilder.RecordBuilder createRecordBuilder(String namespace, String recordName) { - return SchemaBuilder.record(recordName).namespace(namespace) - .prop("ssb.rowtimeAttribute", "ts") - .prop("ssb.watermarkExpression", "`ts` - INTERVAL '30' SECOND"); + return createRecordBuilder(namespace, recordName, "ts"); + } + + public static SchemaBuilder.RecordBuilder createRecordBuilder(String namespace, String recordName, String tsFieldName) { + SchemaBuilder.RecordBuilder recordBuilder = SchemaBuilder.record(recordName).namespace(namespace); + if (tsFieldName != null) { + recordBuilder + .prop("ssb.rowtimeAttribute", tsFieldName) + .prop("ssb.watermarkExpression", String.format("`%s` - INTERVAL '30' SECOND", tsFieldName)); + } + return recordBuilder; } } From b061055e20ceb1cf985ae143e4264bab4216c28c Mon Sep 17 00:00:00 2001 From: Stas Panasiuk Date: Thu, 21 Mar 2024 09:23:44 -0400 Subject: [PATCH 07/19] [CYB-206] dev log removed --- .../cloudera/cyber/indexing/hive/util/AvroSchemaUtil.java | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/flink-cyber/flink-indexing/flink-indexing-hive/src/main/java/com/cloudera/cyber/indexing/hive/util/AvroSchemaUtil.java b/flink-cyber/flink-indexing/flink-indexing-hive/src/main/java/com/cloudera/cyber/indexing/hive/util/AvroSchemaUtil.java index 413638750..16461b330 100644 --- a/flink-cyber/flink-indexing/flink-indexing-hive/src/main/java/com/cloudera/cyber/indexing/hive/util/AvroSchemaUtil.java +++ b/flink-cyber/flink-indexing/flink-indexing-hive/src/main/java/com/cloudera/cyber/indexing/hive/util/AvroSchemaUtil.java @@ -14,11 +14,7 @@ import org.apache.flink.types.Row; import java.time.Instant; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; public class AvroSchemaUtil { @@ -52,7 +48,6 @@ public static void putRowIntoAvro(Row row, GenericRecord record, String fieldNam } } record.put(avroFieldName, value); - System.out.println("fieldName: " + fieldName + " value: " + value); } private static Object convertToAvroObject(Schema fieldSchema, Object value) { From d82f591fb3bfa66d8f57b50f26bc1a73bdd90d4c Mon Sep 17 00:00:00 2001 From: cduby Date: Fri, 22 Mar 2024 13:55:47 -0400 Subject: [PATCH 08/19] [CYB-207] cybersec toolkit supports CSA 1.12.0 with CDP 7.1.9 --- flink-cyber/caracal-generator/pom.xml | 7 ++ flink-cyber/caracal-parser/pom.xml | 2 +- flink-cyber/flink-alert-scoring/pom.xml | 5 + .../flink-commands/scoring-commands/pom.xml | 18 ++++ flink-cyber/flink-common/pom.xml | 7 +- flink-cyber/flink-dedupe/pom.xml | 3 +- .../flink-enrichment-cidr/pom.xml | 5 + .../flink-enrichment-geocode/pom.xml | 5 + .../flink-enrichment-load/pom.xml | 3 +- .../flink-enrichment-lookup-common/pom.xml | 8 ++ .../flink-enrichment-lookup-hbase/pom.xml | 3 +- .../flink-enrichment-lookup-raw/pom.xml | 2 +- .../flink-enrichment-lookup-rest/pom.xml | 7 +- .../flink-enrichment-threatq/pom.xml | 2 +- .../flink-indexing-hive/pom.xml | 36 ++++++- .../flink-indexing-parquet/pom.xml | 2 +- .../flink-indexing-solr/pom.xml | 13 ++- flink-cyber/flink-indexing/pom.xml | 1 - flink-cyber/flink-profiler-java/pom.xml | 8 ++ flink-cyber/flink-sessions/pom.xml | 2 +- flink-cyber/flink-stellar/pom.xml | 2 +- .../parser-chains-parsers/pom.xml | 3 +- flink-cyber/metron-parser-chain/pom.xml | 1 - flink-cyber/parser-chains-flink/pom.xml | 102 ++++++++++++------ flink-cyber/pom.xml | 54 ++++++---- 25 files changed, 228 insertions(+), 73 deletions(-) diff --git a/flink-cyber/caracal-generator/pom.xml b/flink-cyber/caracal-generator/pom.xml index a40060e5d..f825b12e9 100644 --- a/flink-cyber/caracal-generator/pom.xml +++ b/flink-cyber/caracal-generator/pom.xml @@ -33,6 +33,13 @@ flink-connector-kafka + + org.apache.flink + flink-clients + provided + ${flink.version} + + com.hortonworks.smm monitoring-interceptors diff --git a/flink-cyber/caracal-parser/pom.xml b/flink-cyber/caracal-parser/pom.xml index 7277a2580..45f034690 100644 --- a/flink-cyber/caracal-parser/pom.xml +++ b/flink-cyber/caracal-parser/pom.xml @@ -126,7 +126,7 @@ org.apache.flink - flink-cloudera-registry + flink-connector-cloudera-registry diff --git a/flink-cyber/flink-alert-scoring/pom.xml b/flink-cyber/flink-alert-scoring/pom.xml index 91a09d67c..1b6a740e0 100644 --- a/flink-cyber/flink-alert-scoring/pom.xml +++ b/flink-cyber/flink-alert-scoring/pom.xml @@ -95,6 +95,11 @@ org.apache.flink flink-statebackend-rocksdb + + com.google.guava + guava + test + diff --git a/flink-cyber/flink-commands/scoring-commands/pom.xml b/flink-cyber/flink-commands/scoring-commands/pom.xml index ccfe8bc9f..f8d2443b8 100644 --- a/flink-cyber/flink-commands/scoring-commands/pom.xml +++ b/flink-cyber/flink-commands/scoring-commands/pom.xml @@ -55,6 +55,14 @@ ${jackson.datatype.version} compile + + com.hortonworks.registries + schema-registry-serdes + + + com.google.guava + guava + org.slf4j @@ -85,6 +93,10 @@ avro compile + + org.apache.flink + flink-java + @@ -123,6 +135,12 @@ + + + com.google. + com.cloudera.cyber.com.google. + + com.cloudera.cyber.scoring.ScoringJobKafka diff --git a/flink-cyber/flink-common/pom.xml b/flink-cyber/flink-common/pom.xml index 7d3a0e0f8..53f49a4a7 100644 --- a/flink-cyber/flink-common/pom.xml +++ b/flink-cyber/flink-common/pom.xml @@ -49,12 +49,17 @@ org.apache.flink - flink-cloudera-registry + flink-connector-cloudera-registry com.hortonworks.registries schema-registry-serdes + + org.apache.flink + flink-connector-base + ${flink.version} + org.apache.avro avro diff --git a/flink-cyber/flink-dedupe/pom.xml b/flink-cyber/flink-dedupe/pom.xml index 940d9d650..17afd33b6 100644 --- a/flink-cyber/flink-dedupe/pom.xml +++ b/flink-cyber/flink-dedupe/pom.xml @@ -45,8 +45,9 @@ org.apache.flink - flink-cloudera-registry + flink-connector-cloudera-registry + com.hortonworks.registries schema-registry-serdes diff --git a/flink-cyber/flink-enrichment/flink-enrichment-cidr/pom.xml b/flink-cyber/flink-enrichment/flink-enrichment-cidr/pom.xml index d21ad7006..deda184cf 100644 --- a/flink-cyber/flink-enrichment/flink-enrichment-cidr/pom.xml +++ b/flink-cyber/flink-enrichment/flink-enrichment-cidr/pom.xml @@ -90,6 +90,11 @@ tests + + com.google.guava + guava + + com.hortonworks.smm monitoring-interceptors diff --git a/flink-cyber/flink-enrichment/flink-enrichment-geocode/pom.xml b/flink-cyber/flink-enrichment/flink-enrichment-geocode/pom.xml index 2fb92f1e9..26c2abf46 100644 --- a/flink-cyber/flink-enrichment/flink-enrichment-geocode/pom.xml +++ b/flink-cyber/flink-enrichment/flink-enrichment-geocode/pom.xml @@ -156,6 +156,11 @@ ${commons-validator.version} + + com.google.guava + guava + + org.assertj assertj-core diff --git a/flink-cyber/flink-enrichment/flink-enrichment-load/pom.xml b/flink-cyber/flink-enrichment/flink-enrichment-load/pom.xml index 92feb896a..e2cff6357 100644 --- a/flink-cyber/flink-enrichment/flink-enrichment-load/pom.xml +++ b/flink-cyber/flink-enrichment/flink-enrichment-load/pom.xml @@ -76,9 +76,10 @@ org.apache.flink - flink-cloudera-registry + flink-connector-cloudera-registry compile + com.hortonworks.registries schema-registry-serdes diff --git a/flink-cyber/flink-enrichment/flink-enrichment-lookup-common/pom.xml b/flink-cyber/flink-enrichment/flink-enrichment-lookup-common/pom.xml index 2c989478e..02b05df64 100644 --- a/flink-cyber/flink-enrichment/flink-enrichment-lookup-common/pom.xml +++ b/flink-cyber/flink-enrichment/flink-enrichment-lookup-common/pom.xml @@ -39,6 +39,14 @@ lombok provided + + com.fasterxml.jackson.core + jackson-core + + + com.fasterxml.jackson.core + jackson-databind + diff --git a/flink-cyber/flink-enrichment/flink-enrichment-lookup-hbase/pom.xml b/flink-cyber/flink-enrichment/flink-enrichment-lookup-hbase/pom.xml index f8e3d108d..12b1ebb80 100644 --- a/flink-cyber/flink-enrichment/flink-enrichment-lookup-hbase/pom.xml +++ b/flink-cyber/flink-enrichment/flink-enrichment-lookup-hbase/pom.xml @@ -56,7 +56,7 @@ org.apache.flink - flink-cloudera-registry + flink-connector-cloudera-registry provided @@ -64,6 +64,7 @@ com.hortonworks.registries schema-registry-serdes + com.cloudera.cyber flink-cyber-api diff --git a/flink-cyber/flink-enrichment/flink-enrichment-lookup-raw/pom.xml b/flink-cyber/flink-enrichment/flink-enrichment-lookup-raw/pom.xml index 4128ddfb0..af287c78e 100644 --- a/flink-cyber/flink-enrichment/flink-enrichment-lookup-raw/pom.xml +++ b/flink-cyber/flink-enrichment/flink-enrichment-lookup-raw/pom.xml @@ -50,7 +50,7 @@ org.apache.flink - flink-cloudera-registry + flink-connector-cloudera-registry provided diff --git a/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/pom.xml b/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/pom.xml index aff14970e..6287bc60f 100644 --- a/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/pom.xml +++ b/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/pom.xml @@ -51,10 +51,15 @@ org.apache.flink - flink-cloudera-registry + flink-connector-cloudera-registry provided + + com.hortonworks.registries + schema-registry-serdes + + com.cloudera.cyber flink-cyber-api diff --git a/flink-cyber/flink-enrichment/flink-enrichment-threatq/pom.xml b/flink-cyber/flink-enrichment/flink-enrichment-threatq/pom.xml index 0ed169baf..dadba9cfb 100644 --- a/flink-cyber/flink-enrichment/flink-enrichment-threatq/pom.xml +++ b/flink-cyber/flink-enrichment/flink-enrichment-threatq/pom.xml @@ -51,7 +51,7 @@ org.apache.flink - flink-cloudera-registry + flink-connector-cloudera-registry provided diff --git a/flink-cyber/flink-indexing/flink-indexing-hive/pom.xml b/flink-cyber/flink-indexing/flink-indexing-hive/pom.xml index 5464be069..fa1c75fe5 100644 --- a/flink-cyber/flink-indexing/flink-indexing-hive/pom.xml +++ b/flink-cyber/flink-indexing/flink-indexing-hive/pom.xml @@ -57,22 +57,42 @@ ${project.parent.version} + + org.apache.flink + flink-clients + ${flink.version} + provided + + + + org.apache.flink + flink-table-runtime + ${flink.version} + provided + org.apache.flink flink-streaming-java + + + com.esotericsoftware.kryo + kryo + + org.apache.flink - flink-table-api-java-bridge + flink-table-api-java provided + ${flink.version} org.apache.flink - flink-table-planner_${scala.binary.version} + flink-table-planner-loader ${flink.version} provided @@ -89,7 +109,7 @@ org.apache.flink - flink-cloudera-registry + flink-connector-cloudera-registry @@ -101,6 +121,10 @@ org.apache.flink flink-json + + com.google.guava + guava + @@ -350,6 +374,10 @@ 2.2.0 test + + org.apache.flink + flink-table-api-java-bridge + @@ -360,7 +388,7 @@ org.apache.maven.plugins maven-shade-plugin - 3.4.1 + ${maven-shade-plugin.version} diff --git a/flink-cyber/flink-indexing/flink-indexing-parquet/pom.xml b/flink-cyber/flink-indexing/flink-indexing-parquet/pom.xml index 43fef87dd..4dff1411d 100644 --- a/flink-cyber/flink-indexing/flink-indexing-parquet/pom.xml +++ b/flink-cyber/flink-indexing/flink-indexing-parquet/pom.xml @@ -100,7 +100,7 @@ org.apache.maven.plugins maven-shade-plugin - 3.0.0 + ${maven-shade-plugin.version} diff --git a/flink-cyber/flink-indexing/flink-indexing-solr/pom.xml b/flink-cyber/flink-indexing/flink-indexing-solr/pom.xml index f9c7e74dc..bf3db722e 100644 --- a/flink-cyber/flink-indexing/flink-indexing-solr/pom.xml +++ b/flink-cyber/flink-indexing/flink-indexing-solr/pom.xml @@ -85,6 +85,11 @@ flink-connector-kafka + + com.google.guava + guava + + org.apache.flink flink-test-utils @@ -100,7 +105,7 @@ org.apache.maven.plugins maven-shade-plugin - 3.0.0 + ${maven-shade-plugin.version} @@ -130,6 +135,12 @@ + + + com.google. + com.cloudera.cyber.com.google. + + diff --git a/flink-cyber/flink-indexing/pom.xml b/flink-cyber/flink-indexing/pom.xml index 8f3c54d15..d9ab934f4 100644 --- a/flink-cyber/flink-indexing/pom.xml +++ b/flink-cyber/flink-indexing/pom.xml @@ -27,7 +27,6 @@ flink-indexing-parquet flink-indexing-solr - flink-indexing-elastic flink-indexing-search flink-indexing-hive diff --git a/flink-cyber/flink-profiler-java/pom.xml b/flink-cyber/flink-profiler-java/pom.xml index 0fa01edeb..15f1fae45 100644 --- a/flink-cyber/flink-profiler-java/pom.xml +++ b/flink-cyber/flink-profiler-java/pom.xml @@ -37,6 +37,14 @@ org.apache.flink flink-connector-hbase-${flink.hbase.version} + + org.apache.flink + flink-connector-cloudera-registry + + + com.hortonworks.registries + schema-registry-serdes + com.cloudera.cyber flink-profiler diff --git a/flink-cyber/flink-sessions/pom.xml b/flink-cyber/flink-sessions/pom.xml index 358f64472..5e2fc78ff 100644 --- a/flink-cyber/flink-sessions/pom.xml +++ b/flink-cyber/flink-sessions/pom.xml @@ -35,7 +35,7 @@ org.apache.flink - flink-cloudera-registry + flink-connector-cloudera-registry diff --git a/flink-cyber/flink-stellar/pom.xml b/flink-cyber/flink-stellar/pom.xml index b153e0a34..4ed271838 100644 --- a/flink-cyber/flink-stellar/pom.xml +++ b/flink-cyber/flink-stellar/pom.xml @@ -35,7 +35,7 @@ UTF-8 1.10 ${guava.version} - 4.3.0.${cdh.version} + 5.4.0.${cdh.version} 0.66.19 2.7.4 1.1.1 diff --git a/flink-cyber/metron-parser-chain/parser-chains-parsers/pom.xml b/flink-cyber/metron-parser-chain/parser-chains-parsers/pom.xml index 8da4ed140..3d8a39291 100644 --- a/flink-cyber/metron-parser-chain/parser-chains-parsers/pom.xml +++ b/flink-cyber/metron-parser-chain/parser-chains-parsers/pom.xml @@ -27,7 +27,6 @@ ${antlr.version} 0.1.9 - 3.2.0 @@ -194,7 +193,7 @@ org.apache.maven.plugins maven-shade-plugin - ${global_shade_version} + ${maven-shade-plugin.version} true diff --git a/flink-cyber/metron-parser-chain/pom.xml b/flink-cyber/metron-parser-chain/pom.xml index f8e0c29e4..fab881908 100644 --- a/flink-cyber/metron-parser-chain/pom.xml +++ b/flink-cyber/metron-parser-chain/pom.xml @@ -46,7 +46,6 @@ 1.8 - 3.2.1 3.8.1 3.2.0 3.1.1 diff --git a/flink-cyber/parser-chains-flink/pom.xml b/flink-cyber/parser-chains-flink/pom.xml index 366efe323..650b1be65 100644 --- a/flink-cyber/parser-chains-flink/pom.xml +++ b/flink-cyber/parser-chains-flink/pom.xml @@ -33,6 +33,7 @@ org.apache.flink flink-streaming-java + org.apache.flink flink-clients @@ -51,6 +52,28 @@ hbase-client ${hbase.version} compile + + + org.slf4j + jul-to-slf4j + + + org.slf4j + jcl-over-slf4j + + + org.slf4j + log4j-over-slf4j + + + ch.qos.logback + logback-classic + + + org.slf4j + slf4j-reload4j + + @@ -84,6 +107,13 @@ flink-parquet + + com.cloudera.cyber + flink-logging + ${project.parent.version} + compile + + org.apache.parquet parquet-avro @@ -98,6 +128,26 @@ it.unimi.dsi fastutil + + org.slf4j + jul-to-slf4j + + + org.slf4j + jcl-over-slf4j + + + org.slf4j + log4j-over-slf4j + + + ch.qos.logback + logback-classic + + + org.slf4j + slf4j-reload4j + @@ -106,6 +156,24 @@ parser-chains-core ${project.parent.version} compile + + + org.slf4j + jul-to-slf4j + + + org.slf4j + jcl-over-slf4j + + + org.slf4j + log4j-over-slf4j + + + ch.qos.logback + logback-classic + + @@ -120,38 +188,6 @@ multiline-string - - - org.slf4j - slf4j-api - ${slf4j.version} - - - - org.apache.logging.log4j - log4j-slf4j-impl - ${log4j.version} - - - - org.apache.logging.log4j - log4j-api - ${log4j.version} - - - - org.apache.logging.log4j - log4j-core - ${log4j.version} - - - - - org.apache.logging.log4j - log4j-1.2-api - ${log4j.version} - - org.assertj assertj-core @@ -243,7 +279,7 @@ META-INF/*.SF META-INF/*.DSA META-INF/*.RSA - + diff --git a/flink-cyber/pom.xml b/flink-cyber/pom.xml index 6265f53e7..c2a055faa 100644 --- a/flink-cyber/pom.xml +++ b/flink-cyber/pom.xml @@ -54,16 +54,18 @@ UTF-8 1.8 - 1.16.2-csadh1.10.0.100 + csa1.12.0.0 + 1.18.0-csa1.12.0.0 + 1.0-${csa.version} 2.4 - 7.2.17.0-334 + 7.1.9.0-387 1.18.22 1.18.16.0 21.0 - 1.11.1 - 3.4.0.${cdh.version} + 1.11.3 + 3.4.1.${cdh.version} 2.12 - 3.4.0.${cdh.version} + 3.4.1.${cdh.version} 1.11.4 2.11.2 2.10.1 @@ -73,15 +75,15 @@ 2.17.2 0.1.2 1.15.0 - 3.2.4 + 3.3.0 3.2.0 3.0.0-M5 3.0.0-M5 3.3.1 - 8.4.1.${cdh.version} + 8.11.2.${cdh.version} 2.13.1 3.1.1.${cdh.version} - 2.4.6.${cdh.version} + 2.4.17.${cdh.version} 3.6.0 1.3.0-incubating 3.13.2 @@ -158,13 +160,23 @@ org.apache.flink - flink-connector-kafka + flink-conector-base ${flink.version} + + + + org.apache.flink + flink-connector-kafka + 3.1-${csa.version} org.apache.kafka kafka-clients + + com.fasterxml.jackson.core + jackson-core + @@ -238,7 +250,7 @@ org.apache.flink flink-connector-hbase-${flink.hbase.version} - ${flink.version} + 3.0-${csa.version} org.apache.hbase @@ -261,8 +273,8 @@ org.apache.flink - flink-cloudera-registry - ${flink.version} + flink-connector-cloudera-registry + 1.0-${csa.version} org.apache.avro @@ -334,12 +346,6 @@ ${flink.version} - - org.apache.flink - flink-connector-elasticsearch7 - ${flink.version} - - org.apache.solr solr-solrj @@ -411,7 +417,8 @@ com.hortonworks.registries schema-registry-serdes - 0.9.1 + 0.10.0.${cdh.version} + compile org.slf4j @@ -455,6 +462,13 @@ ${jackson.version} + + com.fasterxml.jackson.core + jackson-core + compile + ${jackson.version} + + commons-validator commons-validator @@ -668,7 +682,7 @@ org.apache.flink flink-connector-kafka - ${flink.version} + 3.1-${csa.version} From 63dcbfb8fa6be6a3aed1b749f0dcb7411af52752 Mon Sep 17 00:00:00 2001 From: cduby Date: Thu, 28 Mar 2024 20:59:25 -0400 Subject: [PATCH 09/19] [CYB-207] fix indexing job serialization error --- flink-cyber/flink-cyber-api/pom.xml | 1 - .../flink-indexing-hive/pom.xml | 7 ++++ .../hive/tableapi/impl/MapRowToAvro.java | 40 +++++++++++++++++++ .../hive/tableapi/impl/TableApiKafkaJob.java | 13 +----- .../flink-indexing-parquet/pom.xml | 1 - 5 files changed, 48 insertions(+), 14 deletions(-) create mode 100644 flink-cyber/flink-indexing/flink-indexing-hive/src/main/java/com/cloudera/cyber/indexing/hive/tableapi/impl/MapRowToAvro.java diff --git a/flink-cyber/flink-cyber-api/pom.xml b/flink-cyber/flink-cyber-api/pom.xml index 5dddfd865..2d72699bc 100644 --- a/flink-cyber/flink-cyber-api/pom.xml +++ b/flink-cyber/flink-cyber-api/pom.xml @@ -77,7 +77,6 @@ org.apache.avro avro - ${avro.version} diff --git a/flink-cyber/flink-indexing/flink-indexing-hive/pom.xml b/flink-cyber/flink-indexing/flink-indexing-hive/pom.xml index fa1c75fe5..355e6344d 100644 --- a/flink-cyber/flink-indexing/flink-indexing-hive/pom.xml +++ b/flink-cyber/flink-indexing/flink-indexing-hive/pom.xml @@ -71,6 +71,11 @@ provided + + org.apache.flink + flink-avro + + org.apache.flink @@ -404,6 +409,7 @@ org.slf4j:* log4j:* org.apache.logging.log4j:* + org.apache.avro:avro @@ -415,6 +421,7 @@ META-INF/*.SF META-INF/*.DSA META-INF/*.RSA + org/apache/avro/** diff --git a/flink-cyber/flink-indexing/flink-indexing-hive/src/main/java/com/cloudera/cyber/indexing/hive/tableapi/impl/MapRowToAvro.java b/flink-cyber/flink-indexing/flink-indexing-hive/src/main/java/com/cloudera/cyber/indexing/hive/tableapi/impl/MapRowToAvro.java new file mode 100644 index 000000000..935c1c1bd --- /dev/null +++ b/flink-cyber/flink-indexing/flink-indexing-hive/src/main/java/com/cloudera/cyber/indexing/hive/tableapi/impl/MapRowToAvro.java @@ -0,0 +1,40 @@ +package com.cloudera.cyber.indexing.hive.tableapi.impl; + +import com.cloudera.cyber.indexing.hive.util.AvroSchemaUtil; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; +import org.apache.flink.types.Row; + +import java.util.Set; + +public class MapRowToAvro implements ResultTypeQueryable, MapFunction { + private final GenericRecordAvroTypeInfo producedType; + private final Schema schema; + + public MapRowToAvro(String schemaString) { + this.schema = new Schema.Parser().parse(schemaString); + this.producedType = new GenericRecordAvroTypeInfo(schema); + } + + @Override + public TypeInformation getProducedType() { + return producedType; + } + + @Override + public GenericRecord map(Row row) throws Exception { + final GenericRecord record = new GenericData.Record(schema); + final Set fieldNames = row.getFieldNames(true); + if (fieldNames != null) { + for (String fieldName : fieldNames) { + AvroSchemaUtil.putRowIntoAvro(row, record, fieldName); + } + } + return record; + } +} diff --git a/flink-cyber/flink-indexing/flink-indexing-hive/src/main/java/com/cloudera/cyber/indexing/hive/tableapi/impl/TableApiKafkaJob.java b/flink-cyber/flink-indexing/flink-indexing-hive/src/main/java/com/cloudera/cyber/indexing/hive/tableapi/impl/TableApiKafkaJob.java index 921bb6b34..fdc0a6a81 100644 --- a/flink-cyber/flink-indexing/flink-indexing-hive/src/main/java/com/cloudera/cyber/indexing/hive/tableapi/impl/TableApiKafkaJob.java +++ b/flink-cyber/flink-indexing/flink-indexing-hive/src/main/java/com/cloudera/cyber/indexing/hive/tableapi/impl/TableApiKafkaJob.java @@ -55,18 +55,7 @@ protected void executeInsert(StreamTableEnvironment tableEnv, Map stream = tableEnv.toDataStream(table).map(row -> { - final Schema schema = new Schema.Parser().parse(schemaString); - final GenericRecord record = new GenericData.Record(schema); - final Set fieldNames = row.getFieldNames(true); - if (fieldNames != null) { - for (String fieldName : fieldNames) { - AvroSchemaUtil.putRowIntoAvro(row, record, fieldName); - } - } - - return record; - }); + final DataStream stream = tableEnv.toDataStream(table).map(new MapRowToAvro(schemaString)); stream.sinkTo(kafkaSink); System.out.printf("Insert SQL added to the queue for the table: %s%nSQL: %s%n", mappingDto.getTableName(), insertSql); diff --git a/flink-cyber/flink-indexing/flink-indexing-parquet/pom.xml b/flink-cyber/flink-indexing/flink-indexing-parquet/pom.xml index 4dff1411d..a2487fc53 100644 --- a/flink-cyber/flink-indexing/flink-indexing-parquet/pom.xml +++ b/flink-cyber/flink-indexing/flink-indexing-parquet/pom.xml @@ -57,7 +57,6 @@ org.apache.avro avro - ${avro.version} org.apache.flink From 9f1033cf194d9e90657fa33d3cd900c68ea75748 Mon Sep 17 00:00:00 2001 From: cduby Date: Fri, 29 Mar 2024 11:47:00 -0400 Subject: [PATCH 10/19] [CYB-207] add missing jupiter dependency. use jupiter for all tests --- .../flink-indexing/flink-indexing-hive/pom.xml | 13 +++++++++++-- .../indexing/hive/TimestampNormalizerTest.java | 16 +++++++++------- 2 files changed, 20 insertions(+), 9 deletions(-) diff --git a/flink-cyber/flink-indexing/flink-indexing-hive/pom.xml b/flink-cyber/flink-indexing/flink-indexing-hive/pom.xml index 355e6344d..e0b4efd16 100644 --- a/flink-cyber/flink-indexing/flink-indexing-hive/pom.xml +++ b/flink-cyber/flink-indexing/flink-indexing-hive/pom.xml @@ -358,15 +358,18 @@ - junit - junit + org.junit.jupiter + junit-jupiter test + ${jupiter.junit.version} + org.hamcrest hamcrest test + org.mockito mockito-core @@ -441,6 +444,12 @@ org.apache.maven.plugins maven-compiler-plugin + + + org.apache.maven.plugins + maven-surefire-plugin + ${maven-surefire-plugin.version} + \ No newline at end of file diff --git a/flink-cyber/flink-indexing/flink-indexing-hive/src/test/java/com/cloudera/cyber/indexing/hive/TimestampNormalizerTest.java b/flink-cyber/flink-indexing/flink-indexing-hive/src/test/java/com/cloudera/cyber/indexing/hive/TimestampNormalizerTest.java index ad7a5b294..7ded82282 100644 --- a/flink-cyber/flink-indexing/flink-indexing-hive/src/test/java/com/cloudera/cyber/indexing/hive/TimestampNormalizerTest.java +++ b/flink-cyber/flink-indexing/flink-indexing-hive/src/test/java/com/cloudera/cyber/indexing/hive/TimestampNormalizerTest.java @@ -13,13 +13,15 @@ package com.cloudera.cyber.indexing.hive; import com.cloudera.cyber.MessageUtils; -import org.junit.Assert; -import org.junit.Test; +import org.junit.jupiter.api.Test; import java.text.SimpleDateFormat; import java.time.Instant; import java.util.Date; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + public class TimestampNormalizerTest { private final SimpleDateFormat hiveDateFormatter = new SimpleDateFormat(HiveStreamingMessageWriter.HIVE_DATE_FORMAT); @@ -31,24 +33,24 @@ public void testLongTimestamp() { long currentMillis = MessageUtils.getCurrentTimestamp(); String expectedTimestamp = hiveDateFormatter.format(Date.from(Instant.ofEpochMilli(currentMillis))); String fieldValue = Long.toString(currentMillis); - Assert.assertEquals(expectedTimestamp, normalizer.apply(fieldValue)); + assertEquals(expectedTimestamp, normalizer.apply(fieldValue)); } @Test public void testFirstFormattedTimestamp() { String fieldValue = "2020-11-19 22:00:01.000000"; - Assert.assertEquals("2020-11-19 22:00:01.000", normalizer.apply(fieldValue)); + assertEquals("2020-11-19 22:00:01.000", normalizer.apply(fieldValue)); } @Test public void testLastFormattedTimestamp() { String fieldValue = "2020-01-15T23:05:33Z"; - Assert.assertEquals("2020-01-15 23:05:33.000", normalizer.apply(fieldValue)); + assertEquals("2020-01-15 23:05:33.000", normalizer.apply(fieldValue)); } - @Test(expected=IllegalStateException.class) + @Test public void testTimestampWithoutMatch() { String fieldValue = "not a timestamp"; - normalizer.apply(fieldValue); + assertThrows(IllegalStateException.class, () -> normalizer.apply(fieldValue)); } } From 311d841fbf44397a6053f31fec34e0cedf3ff05c Mon Sep 17 00:00:00 2001 From: cduby Date: Fri, 29 Mar 2024 13:48:56 -0400 Subject: [PATCH 11/19] [CYB-207] remove deprecated forkMode for rest enrichments module --- .../flink-enrichment/flink-enrichment-lookup-rest/pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/pom.xml b/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/pom.xml index 6287bc60f..18cde2c2f 100644 --- a/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/pom.xml +++ b/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/pom.xml @@ -201,8 +201,8 @@ maven-surefire-plugin ${maven-surefire-plugin.version} - - always + 1 + false From 800bcf6b93c7064d1c787da1dc9b6f689fe724ae Mon Sep 17 00:00:00 2001 From: cduby Date: Mon, 1 Apr 2024 10:33:23 -0400 Subject: [PATCH 12/19] [CYB-207] add junit dependency --- .../flink-enrichment/flink-enrichment-lookup-rest/pom.xml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/pom.xml b/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/pom.xml index 18cde2c2f..57f1f57e3 100644 --- a/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/pom.xml +++ b/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/pom.xml @@ -123,6 +123,10 @@ ${global.httpclient.version} compile + + junit + junit + org.mock-server mockserver-netty From 1fe52232ee91b6e755e0ca0225cb11a1c39e3e6d Mon Sep 17 00:00:00 2001 From: cduby Date: Tue, 2 Apr 2024 15:48:02 -0400 Subject: [PATCH 13/19] [CYB-207] use https for mock server for all tests. --- .../enrichment/rest/AsyncHttpRequest.java | 1 - .../cyber/enrichment/rest/RestLookupJob.java | 5 +- .../enrichment/rest/AsyncHttpRequestTest.java | 14 ++-- .../enrichment/rest/GetRestRequestTest.java | 29 ++++---- .../rest/NonSecureGetRequestTest.java | 29 -------- .../rest/NonSecurePostRestRequestTest.java | 30 --------- .../enrichment/rest/PostRestRequestTest.java | 67 ++++++++++++++----- .../enrichment/rest/RestLookupJobTest.java | 6 +- .../enrichment/rest/RestRequestTest.java | 12 +++- .../enrichment/rest/SecureGetRequestTest.java | 30 --------- .../rest/SecurePostRestRequestTest.java | 63 ----------------- .../enrichment/rest/impl/MockRestServer.java | 41 +++++++----- 12 files changed, 110 insertions(+), 217 deletions(-) delete mode 100644 flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/NonSecureGetRequestTest.java delete mode 100644 flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/NonSecurePostRestRequestTest.java delete mode 100644 flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/SecureGetRequestTest.java delete mode 100644 flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/SecurePostRestRequestTest.java diff --git a/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/main/java/com/cloudera/cyber/enrichment/rest/AsyncHttpRequest.java b/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/main/java/com/cloudera/cyber/enrichment/rest/AsyncHttpRequest.java index 03a0f19b7..39592c442 100644 --- a/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/main/java/com/cloudera/cyber/enrichment/rest/AsyncHttpRequest.java +++ b/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/main/java/com/cloudera/cyber/enrichment/rest/AsyncHttpRequest.java @@ -26,7 +26,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.function.Predicate; import java.util.List; import static com.cloudera.cyber.DataQualityMessageLevel.ERROR; diff --git a/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/main/java/com/cloudera/cyber/enrichment/rest/RestLookupJob.java b/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/main/java/com/cloudera/cyber/enrichment/rest/RestLookupJob.java index 1097142fa..cb81726fc 100644 --- a/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/main/java/com/cloudera/cyber/enrichment/rest/RestLookupJob.java +++ b/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/main/java/com/cloudera/cyber/enrichment/rest/RestLookupJob.java @@ -20,7 +20,6 @@ import com.fasterxml.jackson.databind.jsontype.BasicPolymorphicTypeValidator; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.java.utils.ParameterTool; -import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.AsyncDataStream; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -30,6 +29,7 @@ import java.nio.file.Files; import java.nio.file.Paths; import java.util.ArrayList; +import java.util.Base64; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -60,7 +60,7 @@ private static DataStream enrich(DataStream source, List enrich(DataStream source, List expectedQualityMessages = Collections.singletonList(DataQualityMessage.builder(). feature(AsyncHttpRequest.REST_ENRICHMENT_FEATURE). @@ -179,7 +177,7 @@ private void testGetAsset(String fieldName, String assetId, Map } private void testGetAsset(ArrayList enrichmentSources, String messageSource, String fieldName, String assetId, Map expectedExtensions, Collection expectedDataQualityMessages) throws Exception { - AsyncHttpRequest request = new AsyncHttpRequest(mockRestServer.configureGetAssetRequest().sources(enrichmentSources).build()); + AsyncHttpRequest request = new AsyncHttpRequest(mockRestServer.configureGetAssetRequest(MockRestServer.ENCRYPTED_PROTOCOL).sources(enrichmentSources).build()); request.open(new Configuration()); AsyncResult result = new AsyncResult(); diff --git a/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/GetRestRequestTest.java b/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/GetRestRequestTest.java index 7d0a11cc1..1e55c9344 100644 --- a/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/GetRestRequestTest.java +++ b/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/GetRestRequestTest.java @@ -15,25 +15,30 @@ import com.cloudera.cyber.enrichment.rest.impl.MockRestServer; import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; +import org.junit.AfterClass; import org.junit.Assert; -import org.junit.Ignore; +import org.junit.BeforeClass; import org.junit.Test; import java.util.HashMap; import java.util.Map; @Slf4j -// tests will run either with or without tls when junit runs derived classes -@Ignore public class GetRestRequestTest extends RestRequestTest { - public static void createMockService(boolean enableTlsMutualAuth) { - startMockServer(enableTlsMutualAuth); + @BeforeClass + public static void createMockServices() { + startMockServer(); + } + + @AfterClass + public static void stopMockServices() { + stopMockServer(); } @Test public void testSimpleGet() throws Exception { - RestEnrichmentConfig config = mockRestServer.configureGetUserRequest().build(); + RestEnrichmentConfig config = mockRestServer.configureGetUserRequest(MockRestServer.ENCRYPTED_PROTOCOL).build(); Map variables = new HashMap() {{ put("name", "Chris"); @@ -44,7 +49,7 @@ public void testSimpleGet() throws Exception { @Test public void testPropertiesNull() throws Exception { - RestEnrichmentConfig config = mockRestServer.getBuilder(null).sources(Lists.newArrayList(MockRestServer.USER_SOURCE)). + RestEnrichmentConfig config = mockRestServer.getBuilder(null, MockRestServer.ENCRYPTED_PROTOCOL).sources(Lists.newArrayList(MockRestServer.USER_SOURCE)). endpointTemplate(String.format("%s://%s/user?name=${name}", mockRestServer.getMockProtocol(), mockRestServer.getMockHostAndPort())).build(); Map variables = new HashMap() {{ @@ -57,18 +62,18 @@ public void testPropertiesNull() throws Exception { @Test public void testBasicAuthGet() throws Exception { - RestRequestResult result = makeAssetRequest(MockRestServer.ASSET_ID); + RestRequestResult result = makeAssetRequest(MockRestServer.ASSET_ID, mockRestServer); Assert.assertEquals(MockRestServer.ASSET_LOCATION, result.getExtensions().get(MockRestServer.ASSET_LOCATION_PROPERTY)); } @Test public void testServerError() throws Exception { - RestRequestResult result = makeAssetRequest(MockRestServer.SERVER_ERROR_ASSET_ID); - verifyErrorResult(result, "Rest request url='%s://%s/asset?id=56' entity='null' failed 'Rest request failed due to 'HTTP/1.1 503 Service Unavailable'.'"); + RestRequestResult result = makeAssetRequest(MockRestServer.SERVER_ERROR_ASSET_ID, mockRestServer); + verifyErrorResult(result, "Rest request url='%s://%s/asset?id=56' entity='null' failed 'Rest request failed due to 'HTTP/1.1 503 Service Unavailable'.'", mockRestServer); } - private RestRequestResult makeAssetRequest(String assetId) throws Exception { - RestEnrichmentConfig config = mockRestServer.configureGetAssetRequest().build(); + private RestRequestResult makeAssetRequest(String assetId, MockRestServer mockRestServer) throws Exception { + RestEnrichmentConfig config = mockRestServer.configureGetAssetRequest(MockRestServer.ENCRYPTED_PROTOCOL).build(); Map variables = new HashMap() {{ put("id", assetId); }}; diff --git a/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/NonSecureGetRequestTest.java b/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/NonSecureGetRequestTest.java deleted file mode 100644 index 3217a5d7b..000000000 --- a/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/NonSecureGetRequestTest.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Copyright 2020 - 2022 Cloudera. All Rights Reserved. - * - * This file is licensed under the Apache License Version 2.0 (the "License"). You may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0. - * - * This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. Refer to the License for the specific permissions and - * limitations governing your use of the file. - */ - -package com.cloudera.cyber.enrichment.rest; - -import org.junit.AfterClass; -import org.junit.BeforeClass; - -public class NonSecureGetRequestTest extends GetRestRequestTest { - @BeforeClass - public static void createMockService() { - createMockService(false); - } - - @AfterClass - public static void stopMockServer() { - mockRestServer.close(); - } - -} \ No newline at end of file diff --git a/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/NonSecurePostRestRequestTest.java b/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/NonSecurePostRestRequestTest.java deleted file mode 100644 index 04d8620a6..000000000 --- a/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/NonSecurePostRestRequestTest.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright 2020 - 2022 Cloudera. All Rights Reserved. - * - * This file is licensed under the Apache License Version 2.0 (the "License"). You may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0. - * - * This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. Refer to the License for the specific permissions and - * limitations governing your use of the file. - */ - -package com.cloudera.cyber.enrichment.rest; - -import org.junit.AfterClass; -import org.junit.BeforeClass; - -public class NonSecurePostRestRequestTest extends PostRestRequestTest { - - @BeforeClass - public static void createMockService() { - createMockService(false); - } - - @AfterClass - public static void stopMockServer() { - mockRestServer.close(); - } - -} diff --git a/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/PostRestRequestTest.java b/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/PostRestRequestTest.java index f07443499..f12f4f615 100644 --- a/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/PostRestRequestTest.java +++ b/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/PostRestRequestTest.java @@ -15,8 +15,10 @@ import com.cloudera.cyber.enrichment.rest.impl.MockRestServer; import com.google.common.collect.Lists; import org.apache.http.client.methods.HttpPost; +import org.hamcrest.Matcher; +import org.junit.AfterClass; import org.junit.Assert; -import org.junit.Ignore; +import org.junit.BeforeClass; import org.junit.Test; import javax.annotation.Nonnull; @@ -28,14 +30,21 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; -// tests will run either with or without tls when junit runs derived classes -@Ignore +import static org.hamcrest.Matchers.containsString; +import static org.junit.Assert.assertThat; + public class PostRestRequestTest extends RestRequestTest { private static RestEnrichmentConfig modelResultPostRequest; - public static void createMockService(boolean enableTlsMutualAuth) { - startMockServer(enableTlsMutualAuth); - modelResultPostRequest = mockRestServer.configureModelPostRequest().build(); + @BeforeClass + public static void createMockServices() { + startMockServer(); + modelResultPostRequest = mockRestServer.configureModelPostRequest(MockRestServer.ENCRYPTED_PROTOCOL).build(); + } + + @AfterClass + public static void stopMockServices() { + stopMockServer(); } @Test @@ -43,7 +52,7 @@ public void testSimplePostCModel() throws Exception { testSuccessfulModelPosts(modelResultPostRequest); } - protected void testSuccessfulModelPosts(RestEnrichmentConfig config) throws Exception { + private void testSuccessfulModelPosts(RestEnrichmentConfig config) throws Exception { for(MockRestServer.ExpectedModelResult expectedModelResult : MockRestServer.expectedModelResults) { if (expectedModelResult.isSuccess()) { testDomain(config, expectedModelResult.getDomainName(), expectedModelResult.isLegit()); @@ -54,16 +63,16 @@ protected void testSuccessfulModelPosts(RestEnrichmentConfig config) throws Exce @Test public void testHttp200StatusButResultIndicatesError() throws Exception { String expectedErrorFormat = "Rest request url='%s://%s/model' entity='{\"accessKey\":\"mup8kz1hsl3erczwepbt8jupamita6y6\",\"request\":{\"domain\":\"unsuccessfuldomain\"}}' failed 'Rest request returned a success code but the content indicated failure.'"; - testHttpFailedRequest(MockRestServer.UNSUCCESSFUL_DOMAIN, expectedErrorFormat); - + testHttpFailedRequest(MockRestServer.UNSUCCESSFUL_DOMAIN, expectedErrorFormat, modelResultPostRequest, mockRestServer); } + @Test public void testHttp400Exception() throws Exception { String expectedErrorFormat = "Rest request url='%s://%s/model' entity='{\"accessKey\":\"mup8kz1hsl3erczwepbt8jupamita6y6\",\"request\":{\"domain\":\"clienterror\"}}' failed 'Rest request failed due to 'HTTP/1.1 400 Bad Request'.'"; - testHttpFailedRequest(MockRestServer.CLIENT_ERROR_DOMAIN, expectedErrorFormat); + testHttpFailedRequest(MockRestServer.CLIENT_ERROR_DOMAIN, expectedErrorFormat, modelResultPostRequest, mockRestServer); } - public void testHttpFailedRequest(String domainName, String errorMessageFormat) throws Exception { + private void testHttpFailedRequest(String domainName, String errorMessageFormat, RestEnrichmentConfig modelResultPostRequest, MockRestServer mockRestServer) throws Exception { RestRequest request = modelResultPostRequest.createRestEnrichmentRequest(); Map variables = new HashMap() { { @@ -71,7 +80,7 @@ public void testHttpFailedRequest(String domainName, String errorMessageFormat) } }; RestRequestResult result = request.getResult(true, variables).get(); - verifyErrorResult(result, errorMessageFormat); + verifyErrorResult(result, errorMessageFormat, mockRestServer); } private static class MockThrowingPostRequest extends PostRestRequest { @@ -87,30 +96,52 @@ protected void addEntityToRequest(@Nonnull HttpPost postRequest, @Nonnull String @Test public void testUnsupportedCharacterSetException() throws Exception { - MockThrowingPostRequest throwingPostRequest = new MockThrowingPostRequest(modelResultPostRequest); RestRequestKey key = throwingPostRequest.getKey(new HashMap() {{ put(MockRestServer.DOMAIN_EXTENSION_NAME, "mydomain");}}); CompletableFuture future = new MockThrowingPostRequest(modelResultPostRequest).asyncLoad(key, Executors.newFixedThreadPool(1)); RestRequestResult result = future.get(); String expectedErrorFormat = "Rest request url='%s://%s/model' entity='{\"accessKey\":\"mup8kz1hsl3erczwepbt8jupamita6y6\",\"request\":{\"domain\":\"mydomain\"}}' failed 'Default UnsupportedEndcodingException message'"; - verifyErrorResult(result, expectedErrorFormat); + verifyErrorResult(result, expectedErrorFormat, mockRestServer); } @Test public void testUndefinedVariableWithAnyMatchIgnoresKeyErrors() throws Exception { - RestEnrichmentConfig anyMatchRequest = mockRestServer.configureModelPostRequest().sources(Lists.newArrayList(AsyncHttpRequest.ANY_SOURCE_NAME)).build(); + RestEnrichmentConfig anyMatchRequest = mockRestServer.configureModelPostRequest(MockRestServer.ENCRYPTED_PROTOCOL).sources(Lists.newArrayList(AsyncHttpRequest.ANY_SOURCE_NAME)).build(); // accessing an undefined field in a key with any source match does not report an error testDomain(anyMatchRequest, new ArrayList<>()); } + @Test + public void testTlsWithAlias() throws Exception { + RestEnrichmentConfig.RestEnrichmentConfigBuilder modelResultPostRequestWithKeyAlias = mockRestServer.configureTLS(mockRestServer.configureModelPostRequest(MockRestServer.ENCRYPTED_PROTOCOL), "client"); + testSuccessfulModelPosts(modelResultPostRequestWithKeyAlias.build()); + } + + @Test + public void testTlsHandshakeException() throws Exception { + RestEnrichmentConfig.RestEnrichmentConfigBuilder modelResultPostRequestWithKeyAlias = mockRestServer.configureTLS(mockRestServer.configureModelPostRequest(MockRestServer.ENCRYPTED_PROTOCOL), "nosuchkeyalias"); + RestRequest badHandshakePost = new PostRestRequest(modelResultPostRequestWithKeyAlias.build()); + MockRestServer.ExpectedModelResult expectedResult = MockRestServer.expectedModelResults.get(0); + Map extensions = new HashMap() {{ + put(MockRestServer.DOMAIN_EXTENSION_NAME, expectedResult.getDomainName()); + }}; + RestRequestResult result = badHandshakePost.getResult(true, extensions).get(); + + Assert.assertTrue(result.getExtensions().isEmpty()); + List errors = result.getErrors(); + Assert.assertEquals(1, errors.size()); + Matcher expectedString = containsString(String.format("Rest request url='%s://%s/model' entity='{\"accessKey\":\"mup8kz1hsl3erczwepbt8jupamita6y6\",\"request\":{\"domain\":\"google\"}}'", mockRestServer.getMockProtocol(), mockRestServer.getMockHostAndPort())); + assertThat(errors.get(0), expectedString); + } + @Test public void testUndefinedVariableWithSpecificMatchReturnsKeyErrors() throws Exception { - RestEnrichmentConfig anyMatchRequest = mockRestServer.configureModelPostRequest().build(); + RestEnrichmentConfig anyMatchRequest = mockRestServer.configureModelPostRequest(MockRestServer.ENCRYPTED_PROTOCOL).build(); // accessing an undefined field in a key with a specific source match fails with a key error testDomain(anyMatchRequest, Lists.newArrayList("Variable(s) 'domain' required by rest entity are undefined")); } - protected void testDomain(RestEnrichmentConfig config, String domainName, Boolean legit) throws Exception { + private void testDomain(RestEnrichmentConfig config, String domainName, Boolean legit) throws Exception { Map variables = new HashMap() {{ put(MockRestServer.DOMAIN_EXTENSION_NAME, domainName); }}; @@ -119,7 +150,7 @@ protected void testDomain(RestEnrichmentConfig config, String domainName, Boolea Assert.assertTrue(result.getErrors().isEmpty()); } - protected void testDomain(RestEnrichmentConfig config, List expectedErrors) throws Exception { + private void testDomain(RestEnrichmentConfig config, List expectedErrors) throws Exception { Map variables = new HashMap() {{ put("wrong field name", "testdomain"); }}; diff --git a/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/RestLookupJobTest.java b/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/RestLookupJobTest.java index 0c92faf51..7a40cdf55 100644 --- a/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/RestLookupJobTest.java +++ b/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/RestLookupJobTest.java @@ -59,9 +59,9 @@ public static void startMockRestServer() throws IOException { mockRestServer = new MockRestServer(true); File configFile = configTempFolder.newFile("rest-job-test.json"); List modelRestConfig = new ArrayList<>(); - modelRestConfig.add(mockRestServer.configureModelPostRequest().build()); - modelRestConfig.add(mockRestServer.configureGetAssetRequest().build()); - modelRestConfig.add(mockRestServer.configureGetUserRequest().build()); + modelRestConfig.add(mockRestServer.configureModelPostRequest(MockRestServer.ENCRYPTED_PROTOCOL).build()); + modelRestConfig.add(mockRestServer.configureGetAssetRequest(MockRestServer.ENCRYPTED_PROTOCOL).build()); + modelRestConfig.add(mockRestServer.configureGetUserRequest(MockRestServer.ENCRYPTED_PROTOCOL).build()); ObjectWriter ow = getConfigObjectMapper().writerFor(new TypeReference>() { }); ow.writeValue(configFile, modelRestConfig); diff --git a/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/RestRequestTest.java b/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/RestRequestTest.java index 211648c74..5059902f4 100644 --- a/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/RestRequestTest.java +++ b/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/RestRequestTest.java @@ -22,8 +22,14 @@ public abstract class RestRequestTest { protected static MockRestServer mockRestServer; - protected static void startMockServer(boolean enableTlsMutualAuth) { - mockRestServer = new MockRestServer(enableTlsMutualAuth); + protected static void startMockServer() { + mockRestServer = new MockRestServer(true); + } + + protected static void stopMockServer() { + if (mockRestServer != null) { + mockRestServer.close(); + } } protected RestRequestResult makeRequest(RestEnrichmentConfig config, Map variables) throws Exception { @@ -32,7 +38,7 @@ protected RestRequestResult makeRequest(RestEnrichmentConfig config, Map errors = result.getErrors(); Assert.assertEquals(1, errors.size()); diff --git a/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/SecureGetRequestTest.java b/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/SecureGetRequestTest.java deleted file mode 100644 index 7bf43a8b8..000000000 --- a/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/SecureGetRequestTest.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright 2020 - 2022 Cloudera. All Rights Reserved. - * - * This file is licensed under the Apache License Version 2.0 (the "License"). You may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0. - * - * This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. Refer to the License for the specific permissions and - * limitations governing your use of the file. - */ - -package com.cloudera.cyber.enrichment.rest; - -import org.junit.AfterClass; -import org.junit.BeforeClass; - -public class SecureGetRequestTest extends GetRestRequestTest { - - @BeforeClass - public static void createMockService() { - createMockService(true); - } - - @AfterClass - public static void stopMockServer() { - mockRestServer.close(); - } - -} diff --git a/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/SecurePostRestRequestTest.java b/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/SecurePostRestRequestTest.java deleted file mode 100644 index 39f2ac099..000000000 --- a/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/SecurePostRestRequestTest.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Copyright 2020 - 2022 Cloudera. All Rights Reserved. - * - * This file is licensed under the Apache License Version 2.0 (the "License"). You may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0. - * - * This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. Refer to the License for the specific permissions and - * limitations governing your use of the file. - */ - -package com.cloudera.cyber.enrichment.rest; - -import com.cloudera.cyber.enrichment.rest.impl.MockRestServer; -import org.hamcrest.Matcher; -import org.junit.*; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.hamcrest.Matchers.containsString; -import static org.junit.Assert.assertThat; - -public class SecurePostRestRequestTest extends PostRestRequestTest { - - private static final String KEY_ALIAS = "client"; - - @BeforeClass - public static void createMockService() { - createMockService(true); - } - - @AfterClass - public static void stopMockServer() { - mockRestServer.close(); - } - - @Test - public void testTlsWithAlias() throws Exception { - RestEnrichmentConfig.RestEnrichmentConfigBuilder modelResultPostRequestWithKeyAlias = mockRestServer.configureTLS(mockRestServer.configureModelPostRequest(), KEY_ALIAS); - testSuccessfulModelPosts(modelResultPostRequestWithKeyAlias.build()); - } - - @Test - public void testTlsHandshakeException() throws Exception { - RestEnrichmentConfig.RestEnrichmentConfigBuilder modelResultPostRequestWithKeyAlias = mockRestServer.configureTLS(mockRestServer.configureModelPostRequest(), "nosuchkeyalias"); - RestRequest badHandshakePost = new PostRestRequest(modelResultPostRequestWithKeyAlias.build()); - MockRestServer.ExpectedModelResult expectedResult = MockRestServer.expectedModelResults.get(0); - Map extensions = new HashMap() {{ - put(MockRestServer.DOMAIN_EXTENSION_NAME, expectedResult.getDomainName()); - }}; - RestRequestResult result = badHandshakePost.getResult(true, extensions).get(); - - Assert.assertTrue(result.getExtensions().isEmpty()); - List errors = result.getErrors(); - Assert.assertEquals(1, errors.size()); - Matcher expectedString = containsString(String.format("Rest request url='%s://%s/model' entity='{\"accessKey\":\"mup8kz1hsl3erczwepbt8jupamita6y6\",\"request\":{\"domain\":\"google\"}}'", mockRestServer.getMockProtocol(), mockRestServer.getMockHostAndPort())); - assertThat(errors.get(0), expectedString); - } - -} \ No newline at end of file diff --git a/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/impl/MockRestServer.java b/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/impl/MockRestServer.java index f3f4b534f..a22a3473c 100644 --- a/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/impl/MockRestServer.java +++ b/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/impl/MockRestServer.java @@ -12,8 +12,8 @@ package com.cloudera.cyber.enrichment.rest.impl; -import com.google.common.base.Joiner; import com.cloudera.cyber.enrichment.rest.*; +import com.google.common.base.Joiner; import com.google.common.collect.Lists; import lombok.AllArgsConstructor; import lombok.Data; @@ -30,7 +30,10 @@ import javax.net.ssl.HttpsURLConnection; import java.nio.charset.StandardCharsets; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import static org.mockserver.integration.ClientAndServer.startClientAndServer; import static org.mockserver.model.HttpRequest.request; @@ -81,6 +84,8 @@ public class MockRestServer { private static final String BEARER_TOKEN = "mybearertokenaae6b840d045b574d96e35e271419720d0d7645534da6d5ba3d.74c9e8867ef7e0750b5772671acf7e413a744f6d77507eac83584014c71c5866"; private static final int EXPECTED_CLIENT_ERROR_HTTP_CODE = 400; public static final String USER_NAME_PROPERTY = "name"; + public static final String ENCRYPTED_PROTOCOL = "https"; + private ClientAndServer mockServer; private String mockHostAndPort; private String mockProtocol; @@ -101,16 +106,18 @@ public MockRestServer(boolean enableTlsMutualAuth) { initializePostExpectations(); } - public RestEnrichmentConfig.RestEnrichmentConfigBuilder getBuilder(HashMap properties) { + public RestEnrichmentConfig.RestEnrichmentConfigBuilder getBuilder(HashMap properties, String protocol) { RestEnrichmentConfig.RestEnrichmentConfigBuilder builder = RestEnrichmentConfig.builder(). capacity(3); - boolean mutualTlsAuth = mockServer.isSecure(); - if (mutualTlsAuth) { - configureTLS(builder, null); + if (protocol.equals(ENCRYPTED_PROTOCOL)) { + boolean mutualTlsAuth = mockServer.isSecure(); + if (mutualTlsAuth) { + configureTLS(builder, null); + } } if (properties != null) { properties.put("server", mockHostAndPort); - properties.put("protocol", mockProtocol); + properties.put("protocol", protocol); builder.properties(properties); } @@ -127,12 +134,12 @@ public RestEnrichmentConfig.RestEnrichmentConfigBuilder configureTLS(RestEnrichm keyPassword(KEY_PASSWORD).keyAlias(keyAlias).build()); } - public RestEnrichmentConfig.RestEnrichmentConfigBuilder configureModelPostRequest() { + public RestEnrichmentConfig.RestEnrichmentConfigBuilder configureModelPostRequest(String protocol) { HashMap authTokenProperties = new HashMap() {{ put("bearer_token", BEARER_TOKEN); put("access_key", ACCESS_KEY); }}; - return getBuilder(authTokenProperties). + return getBuilder(authTokenProperties, protocol). sources(Lists.newArrayList(DNS_SOURCE)). prefix(DGA_MODEL_PREFIX). endpointTemplate("${protocol}://${server}/model"). @@ -147,20 +154,20 @@ public RestEnrichmentConfig.RestEnrichmentConfigBuilder configureModelPostReques } - public RestEnrichmentConfig.RestEnrichmentConfigBuilder configureGetUserRequest() { - return getBuilder(new HashMap<>()). + public RestEnrichmentConfig.RestEnrichmentConfigBuilder configureGetUserRequest(String protocol) { + return getBuilder(new HashMap<>(), protocol). sources(Lists.newArrayList(USER_SOURCE)). prefix(USER_PREFIX). endpointTemplate("${protocol}://${server}/user?name=${name}"); } - public RestEnrichmentConfig.RestEnrichmentConfigBuilder configureGetAssetRequest() { + public RestEnrichmentConfig.RestEnrichmentConfigBuilder configureGetAssetRequest(String protocol) { HashMap properties = new HashMap() {{ put("user", BASIC_USER_NAME); put("password", BASIC_PASSWORD); }}; - return getBuilder(properties). + return getBuilder(properties, protocol). sources(Lists.newArrayList(ASSET_SOURCE)). prefix(ASSET_PREFIX). endpointTemplate("${protocol}://${server}/asset?id=${id}"). @@ -218,7 +225,7 @@ private void initializePostExpectations() { for (ExpectedModelResult expectedModelResult : expectedModelResults) { addModelResultExpectation(expectedModelResult.isSuccess(), expectedModelResult.getDomainName(), expectedModelResult.isLegit()); } - addModelResultClientErrorExpectation(CLIENT_ERROR_DOMAIN, EXPECTED_CLIENT_ERROR_HTTP_CODE); + addModelResultClientErrorExpectation(); } private void addModelResultExpectation(boolean success, String domain, boolean legit) { @@ -237,18 +244,18 @@ private void addModelResultExpectation(boolean success, String domain, boolean l ); } - private void addModelResultClientErrorExpectation(String domain, int statusCode) { + private void addModelResultClientErrorExpectation() { mockServer.when( request(). withMethod("POST"). withPath("/model"). withContentType(MediaType.APPLICATION_JSON). withHeader(HttpHeaders.AUTHORIZATION, "Bearer ".concat(BEARER_TOKEN)). - withBody(json(String.format("{\"accessKey\":\"%s\",\"request\":{\"domain\":\"%s\"}}", ACCESS_KEY, domain), MatchType.STRICT)) + withBody(json(String.format("{\"accessKey\":\"%s\",\"request\":{\"domain\":\"%s\"}}", ACCESS_KEY, MockRestServer.CLIENT_ERROR_DOMAIN), MatchType.STRICT)) ) .respond( response() - .withStatusCode(statusCode) + .withStatusCode(MockRestServer.EXPECTED_CLIENT_ERROR_HTTP_CODE) ); } From ba0a0a9979e4c40819c997af5267c4335c4a72f6 Mon Sep 17 00:00:00 2001 From: cduby Date: Wed, 3 Apr 2024 09:11:52 -0400 Subject: [PATCH 14/19] [CYB-207] temporary change to run just problem module --- .github/workflows/build_and_test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index c93a241f7..7ce2bfe25 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -28,7 +28,7 @@ jobs: restore-keys: | ${{ runner.os }}-maven- - name: Build and Test with Maven - run: mvn -B package --file flink-cyber/pom.xml + run: mvn -B package -am -pl flink-enrichment/flink-enrichment-lookup-rest --file flink-cyber/pom.xml # Optional: Uploads the full dependency graph to GitHub to improve the quality of Dependabot alerts this repository can receive - name: Update dependency graph uses: advanced-security/maven-dependency-submission-action@571e99aab1055c2e71a1e2309b9691de18d6b7d6 From f4fee5bad7ca83089fb8a7f2bd5aa885d29b616c Mon Sep 17 00:00:00 2001 From: cduby Date: Wed, 3 Apr 2024 19:35:40 -0400 Subject: [PATCH 15/19] [CYB-207] update mockserver dependency --- .../flink-enrichment/flink-enrichment-lookup-rest/pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/pom.xml b/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/pom.xml index 57f1f57e3..26f45cfaa 100644 --- a/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/pom.xml +++ b/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/pom.xml @@ -129,8 +129,8 @@ org.mock-server - mockserver-netty - 5.11.1 + mockserver-netty-no-dependencies + RELEASE test From 31cf004c06e1b180fc5601e0c078357514ca0288 Mon Sep 17 00:00:00 2001 From: Carolyn Duby Date: Thu, 4 Apr 2024 15:06:19 +0000 Subject: [PATCH 16/19] Revert "[CYB-207] temporary change to run just problem module" This reverts commit ba0a0a9979e4c40819c997af5267c4335c4a72f6. --- .github/workflows/build_and_test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 7ce2bfe25..c93a241f7 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -28,7 +28,7 @@ jobs: restore-keys: | ${{ runner.os }}-maven- - name: Build and Test with Maven - run: mvn -B package -am -pl flink-enrichment/flink-enrichment-lookup-rest --file flink-cyber/pom.xml + run: mvn -B package --file flink-cyber/pom.xml # Optional: Uploads the full dependency graph to GitHub to improve the quality of Dependabot alerts this repository can receive - name: Update dependency graph uses: advanced-security/maven-dependency-submission-action@571e99aab1055c2e71a1e2309b9691de18d6b7d6 From 9e7791d7e8601d0d32ae20f77a933f3b8b24c6a4 Mon Sep 17 00:00:00 2001 From: Carolyn Duby Date: Thu, 4 Apr 2024 15:06:26 +0000 Subject: [PATCH 17/19] Revert "[CYB-207] use https for mock server for all tests." This reverts commit 1fe52232ee91b6e755e0ca0225cb11a1c39e3e6d. --- .../enrichment/rest/AsyncHttpRequest.java | 1 + .../cyber/enrichment/rest/RestLookupJob.java | 5 +- .../enrichment/rest/AsyncHttpRequestTest.java | 14 ++-- .../enrichment/rest/GetRestRequestTest.java | 29 ++++---- .../rest/NonSecureGetRequestTest.java | 29 ++++++++ .../rest/NonSecurePostRestRequestTest.java | 30 +++++++++ .../enrichment/rest/PostRestRequestTest.java | 67 +++++-------------- .../enrichment/rest/RestLookupJobTest.java | 6 +- .../enrichment/rest/RestRequestTest.java | 12 +--- .../enrichment/rest/SecureGetRequestTest.java | 30 +++++++++ .../rest/SecurePostRestRequestTest.java | 63 +++++++++++++++++ .../enrichment/rest/impl/MockRestServer.java | 41 +++++------- 12 files changed, 217 insertions(+), 110 deletions(-) create mode 100644 flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/NonSecureGetRequestTest.java create mode 100644 flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/NonSecurePostRestRequestTest.java create mode 100644 flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/SecureGetRequestTest.java create mode 100644 flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/SecurePostRestRequestTest.java diff --git a/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/main/java/com/cloudera/cyber/enrichment/rest/AsyncHttpRequest.java b/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/main/java/com/cloudera/cyber/enrichment/rest/AsyncHttpRequest.java index 39592c442..03a0f19b7 100644 --- a/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/main/java/com/cloudera/cyber/enrichment/rest/AsyncHttpRequest.java +++ b/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/main/java/com/cloudera/cyber/enrichment/rest/AsyncHttpRequest.java @@ -26,6 +26,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.function.Predicate; import java.util.List; import static com.cloudera.cyber.DataQualityMessageLevel.ERROR; diff --git a/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/main/java/com/cloudera/cyber/enrichment/rest/RestLookupJob.java b/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/main/java/com/cloudera/cyber/enrichment/rest/RestLookupJob.java index cb81726fc..1097142fa 100644 --- a/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/main/java/com/cloudera/cyber/enrichment/rest/RestLookupJob.java +++ b/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/main/java/com/cloudera/cyber/enrichment/rest/RestLookupJob.java @@ -20,6 +20,7 @@ import com.fasterxml.jackson.databind.jsontype.BasicPolymorphicTypeValidator; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.AsyncDataStream; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -29,7 +30,6 @@ import java.nio.file.Files; import java.nio.file.Paths; import java.util.ArrayList; -import java.util.Base64; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -60,7 +60,7 @@ private static DataStream enrich(DataStream source, List enrich(DataStream source, List expectedQualityMessages = Collections.singletonList(DataQualityMessage.builder(). feature(AsyncHttpRequest.REST_ENRICHMENT_FEATURE). @@ -177,7 +179,7 @@ private void testGetAsset(String fieldName, String assetId, Map } private void testGetAsset(ArrayList enrichmentSources, String messageSource, String fieldName, String assetId, Map expectedExtensions, Collection expectedDataQualityMessages) throws Exception { - AsyncHttpRequest request = new AsyncHttpRequest(mockRestServer.configureGetAssetRequest(MockRestServer.ENCRYPTED_PROTOCOL).sources(enrichmentSources).build()); + AsyncHttpRequest request = new AsyncHttpRequest(mockRestServer.configureGetAssetRequest().sources(enrichmentSources).build()); request.open(new Configuration()); AsyncResult result = new AsyncResult(); diff --git a/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/GetRestRequestTest.java b/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/GetRestRequestTest.java index 1e55c9344..7d0a11cc1 100644 --- a/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/GetRestRequestTest.java +++ b/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/GetRestRequestTest.java @@ -15,30 +15,25 @@ import com.cloudera.cyber.enrichment.rest.impl.MockRestServer; import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; -import org.junit.AfterClass; import org.junit.Assert; -import org.junit.BeforeClass; +import org.junit.Ignore; import org.junit.Test; import java.util.HashMap; import java.util.Map; @Slf4j +// tests will run either with or without tls when junit runs derived classes +@Ignore public class GetRestRequestTest extends RestRequestTest { - @BeforeClass - public static void createMockServices() { - startMockServer(); - } - - @AfterClass - public static void stopMockServices() { - stopMockServer(); + public static void createMockService(boolean enableTlsMutualAuth) { + startMockServer(enableTlsMutualAuth); } @Test public void testSimpleGet() throws Exception { - RestEnrichmentConfig config = mockRestServer.configureGetUserRequest(MockRestServer.ENCRYPTED_PROTOCOL).build(); + RestEnrichmentConfig config = mockRestServer.configureGetUserRequest().build(); Map variables = new HashMap() {{ put("name", "Chris"); @@ -49,7 +44,7 @@ public void testSimpleGet() throws Exception { @Test public void testPropertiesNull() throws Exception { - RestEnrichmentConfig config = mockRestServer.getBuilder(null, MockRestServer.ENCRYPTED_PROTOCOL).sources(Lists.newArrayList(MockRestServer.USER_SOURCE)). + RestEnrichmentConfig config = mockRestServer.getBuilder(null).sources(Lists.newArrayList(MockRestServer.USER_SOURCE)). endpointTemplate(String.format("%s://%s/user?name=${name}", mockRestServer.getMockProtocol(), mockRestServer.getMockHostAndPort())).build(); Map variables = new HashMap() {{ @@ -62,18 +57,18 @@ public void testPropertiesNull() throws Exception { @Test public void testBasicAuthGet() throws Exception { - RestRequestResult result = makeAssetRequest(MockRestServer.ASSET_ID, mockRestServer); + RestRequestResult result = makeAssetRequest(MockRestServer.ASSET_ID); Assert.assertEquals(MockRestServer.ASSET_LOCATION, result.getExtensions().get(MockRestServer.ASSET_LOCATION_PROPERTY)); } @Test public void testServerError() throws Exception { - RestRequestResult result = makeAssetRequest(MockRestServer.SERVER_ERROR_ASSET_ID, mockRestServer); - verifyErrorResult(result, "Rest request url='%s://%s/asset?id=56' entity='null' failed 'Rest request failed due to 'HTTP/1.1 503 Service Unavailable'.'", mockRestServer); + RestRequestResult result = makeAssetRequest(MockRestServer.SERVER_ERROR_ASSET_ID); + verifyErrorResult(result, "Rest request url='%s://%s/asset?id=56' entity='null' failed 'Rest request failed due to 'HTTP/1.1 503 Service Unavailable'.'"); } - private RestRequestResult makeAssetRequest(String assetId, MockRestServer mockRestServer) throws Exception { - RestEnrichmentConfig config = mockRestServer.configureGetAssetRequest(MockRestServer.ENCRYPTED_PROTOCOL).build(); + private RestRequestResult makeAssetRequest(String assetId) throws Exception { + RestEnrichmentConfig config = mockRestServer.configureGetAssetRequest().build(); Map variables = new HashMap() {{ put("id", assetId); }}; diff --git a/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/NonSecureGetRequestTest.java b/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/NonSecureGetRequestTest.java new file mode 100644 index 000000000..3217a5d7b --- /dev/null +++ b/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/NonSecureGetRequestTest.java @@ -0,0 +1,29 @@ +/* + * Copyright 2020 - 2022 Cloudera. All Rights Reserved. + * + * This file is licensed under the Apache License Version 2.0 (the "License"). You may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. Refer to the License for the specific permissions and + * limitations governing your use of the file. + */ + +package com.cloudera.cyber.enrichment.rest; + +import org.junit.AfterClass; +import org.junit.BeforeClass; + +public class NonSecureGetRequestTest extends GetRestRequestTest { + @BeforeClass + public static void createMockService() { + createMockService(false); + } + + @AfterClass + public static void stopMockServer() { + mockRestServer.close(); + } + +} \ No newline at end of file diff --git a/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/NonSecurePostRestRequestTest.java b/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/NonSecurePostRestRequestTest.java new file mode 100644 index 000000000..04d8620a6 --- /dev/null +++ b/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/NonSecurePostRestRequestTest.java @@ -0,0 +1,30 @@ +/* + * Copyright 2020 - 2022 Cloudera. All Rights Reserved. + * + * This file is licensed under the Apache License Version 2.0 (the "License"). You may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. Refer to the License for the specific permissions and + * limitations governing your use of the file. + */ + +package com.cloudera.cyber.enrichment.rest; + +import org.junit.AfterClass; +import org.junit.BeforeClass; + +public class NonSecurePostRestRequestTest extends PostRestRequestTest { + + @BeforeClass + public static void createMockService() { + createMockService(false); + } + + @AfterClass + public static void stopMockServer() { + mockRestServer.close(); + } + +} diff --git a/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/PostRestRequestTest.java b/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/PostRestRequestTest.java index f12f4f615..f07443499 100644 --- a/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/PostRestRequestTest.java +++ b/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/PostRestRequestTest.java @@ -15,10 +15,8 @@ import com.cloudera.cyber.enrichment.rest.impl.MockRestServer; import com.google.common.collect.Lists; import org.apache.http.client.methods.HttpPost; -import org.hamcrest.Matcher; -import org.junit.AfterClass; import org.junit.Assert; -import org.junit.BeforeClass; +import org.junit.Ignore; import org.junit.Test; import javax.annotation.Nonnull; @@ -30,21 +28,14 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; -import static org.hamcrest.Matchers.containsString; -import static org.junit.Assert.assertThat; - +// tests will run either with or without tls when junit runs derived classes +@Ignore public class PostRestRequestTest extends RestRequestTest { private static RestEnrichmentConfig modelResultPostRequest; - @BeforeClass - public static void createMockServices() { - startMockServer(); - modelResultPostRequest = mockRestServer.configureModelPostRequest(MockRestServer.ENCRYPTED_PROTOCOL).build(); - } - - @AfterClass - public static void stopMockServices() { - stopMockServer(); + public static void createMockService(boolean enableTlsMutualAuth) { + startMockServer(enableTlsMutualAuth); + modelResultPostRequest = mockRestServer.configureModelPostRequest().build(); } @Test @@ -52,7 +43,7 @@ public void testSimplePostCModel() throws Exception { testSuccessfulModelPosts(modelResultPostRequest); } - private void testSuccessfulModelPosts(RestEnrichmentConfig config) throws Exception { + protected void testSuccessfulModelPosts(RestEnrichmentConfig config) throws Exception { for(MockRestServer.ExpectedModelResult expectedModelResult : MockRestServer.expectedModelResults) { if (expectedModelResult.isSuccess()) { testDomain(config, expectedModelResult.getDomainName(), expectedModelResult.isLegit()); @@ -63,16 +54,16 @@ private void testSuccessfulModelPosts(RestEnrichmentConfig config) throws Except @Test public void testHttp200StatusButResultIndicatesError() throws Exception { String expectedErrorFormat = "Rest request url='%s://%s/model' entity='{\"accessKey\":\"mup8kz1hsl3erczwepbt8jupamita6y6\",\"request\":{\"domain\":\"unsuccessfuldomain\"}}' failed 'Rest request returned a success code but the content indicated failure.'"; - testHttpFailedRequest(MockRestServer.UNSUCCESSFUL_DOMAIN, expectedErrorFormat, modelResultPostRequest, mockRestServer); - } + testHttpFailedRequest(MockRestServer.UNSUCCESSFUL_DOMAIN, expectedErrorFormat); + } @Test public void testHttp400Exception() throws Exception { String expectedErrorFormat = "Rest request url='%s://%s/model' entity='{\"accessKey\":\"mup8kz1hsl3erczwepbt8jupamita6y6\",\"request\":{\"domain\":\"clienterror\"}}' failed 'Rest request failed due to 'HTTP/1.1 400 Bad Request'.'"; - testHttpFailedRequest(MockRestServer.CLIENT_ERROR_DOMAIN, expectedErrorFormat, modelResultPostRequest, mockRestServer); + testHttpFailedRequest(MockRestServer.CLIENT_ERROR_DOMAIN, expectedErrorFormat); } - private void testHttpFailedRequest(String domainName, String errorMessageFormat, RestEnrichmentConfig modelResultPostRequest, MockRestServer mockRestServer) throws Exception { + public void testHttpFailedRequest(String domainName, String errorMessageFormat) throws Exception { RestRequest request = modelResultPostRequest.createRestEnrichmentRequest(); Map variables = new HashMap() { { @@ -80,7 +71,7 @@ private void testHttpFailedRequest(String domainName, String errorMessageFormat, } }; RestRequestResult result = request.getResult(true, variables).get(); - verifyErrorResult(result, errorMessageFormat, mockRestServer); + verifyErrorResult(result, errorMessageFormat); } private static class MockThrowingPostRequest extends PostRestRequest { @@ -96,52 +87,30 @@ protected void addEntityToRequest(@Nonnull HttpPost postRequest, @Nonnull String @Test public void testUnsupportedCharacterSetException() throws Exception { + MockThrowingPostRequest throwingPostRequest = new MockThrowingPostRequest(modelResultPostRequest); RestRequestKey key = throwingPostRequest.getKey(new HashMap() {{ put(MockRestServer.DOMAIN_EXTENSION_NAME, "mydomain");}}); CompletableFuture future = new MockThrowingPostRequest(modelResultPostRequest).asyncLoad(key, Executors.newFixedThreadPool(1)); RestRequestResult result = future.get(); String expectedErrorFormat = "Rest request url='%s://%s/model' entity='{\"accessKey\":\"mup8kz1hsl3erczwepbt8jupamita6y6\",\"request\":{\"domain\":\"mydomain\"}}' failed 'Default UnsupportedEndcodingException message'"; - verifyErrorResult(result, expectedErrorFormat, mockRestServer); + verifyErrorResult(result, expectedErrorFormat); } @Test public void testUndefinedVariableWithAnyMatchIgnoresKeyErrors() throws Exception { - RestEnrichmentConfig anyMatchRequest = mockRestServer.configureModelPostRequest(MockRestServer.ENCRYPTED_PROTOCOL).sources(Lists.newArrayList(AsyncHttpRequest.ANY_SOURCE_NAME)).build(); + RestEnrichmentConfig anyMatchRequest = mockRestServer.configureModelPostRequest().sources(Lists.newArrayList(AsyncHttpRequest.ANY_SOURCE_NAME)).build(); // accessing an undefined field in a key with any source match does not report an error testDomain(anyMatchRequest, new ArrayList<>()); } - @Test - public void testTlsWithAlias() throws Exception { - RestEnrichmentConfig.RestEnrichmentConfigBuilder modelResultPostRequestWithKeyAlias = mockRestServer.configureTLS(mockRestServer.configureModelPostRequest(MockRestServer.ENCRYPTED_PROTOCOL), "client"); - testSuccessfulModelPosts(modelResultPostRequestWithKeyAlias.build()); - } - - @Test - public void testTlsHandshakeException() throws Exception { - RestEnrichmentConfig.RestEnrichmentConfigBuilder modelResultPostRequestWithKeyAlias = mockRestServer.configureTLS(mockRestServer.configureModelPostRequest(MockRestServer.ENCRYPTED_PROTOCOL), "nosuchkeyalias"); - RestRequest badHandshakePost = new PostRestRequest(modelResultPostRequestWithKeyAlias.build()); - MockRestServer.ExpectedModelResult expectedResult = MockRestServer.expectedModelResults.get(0); - Map extensions = new HashMap() {{ - put(MockRestServer.DOMAIN_EXTENSION_NAME, expectedResult.getDomainName()); - }}; - RestRequestResult result = badHandshakePost.getResult(true, extensions).get(); - - Assert.assertTrue(result.getExtensions().isEmpty()); - List errors = result.getErrors(); - Assert.assertEquals(1, errors.size()); - Matcher expectedString = containsString(String.format("Rest request url='%s://%s/model' entity='{\"accessKey\":\"mup8kz1hsl3erczwepbt8jupamita6y6\",\"request\":{\"domain\":\"google\"}}'", mockRestServer.getMockProtocol(), mockRestServer.getMockHostAndPort())); - assertThat(errors.get(0), expectedString); - } - @Test public void testUndefinedVariableWithSpecificMatchReturnsKeyErrors() throws Exception { - RestEnrichmentConfig anyMatchRequest = mockRestServer.configureModelPostRequest(MockRestServer.ENCRYPTED_PROTOCOL).build(); + RestEnrichmentConfig anyMatchRequest = mockRestServer.configureModelPostRequest().build(); // accessing an undefined field in a key with a specific source match fails with a key error testDomain(anyMatchRequest, Lists.newArrayList("Variable(s) 'domain' required by rest entity are undefined")); } - private void testDomain(RestEnrichmentConfig config, String domainName, Boolean legit) throws Exception { + protected void testDomain(RestEnrichmentConfig config, String domainName, Boolean legit) throws Exception { Map variables = new HashMap() {{ put(MockRestServer.DOMAIN_EXTENSION_NAME, domainName); }}; @@ -150,7 +119,7 @@ private void testDomain(RestEnrichmentConfig config, String domainName, Boolean Assert.assertTrue(result.getErrors().isEmpty()); } - private void testDomain(RestEnrichmentConfig config, List expectedErrors) throws Exception { + protected void testDomain(RestEnrichmentConfig config, List expectedErrors) throws Exception { Map variables = new HashMap() {{ put("wrong field name", "testdomain"); }}; diff --git a/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/RestLookupJobTest.java b/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/RestLookupJobTest.java index 7a40cdf55..0c92faf51 100644 --- a/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/RestLookupJobTest.java +++ b/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/RestLookupJobTest.java @@ -59,9 +59,9 @@ public static void startMockRestServer() throws IOException { mockRestServer = new MockRestServer(true); File configFile = configTempFolder.newFile("rest-job-test.json"); List modelRestConfig = new ArrayList<>(); - modelRestConfig.add(mockRestServer.configureModelPostRequest(MockRestServer.ENCRYPTED_PROTOCOL).build()); - modelRestConfig.add(mockRestServer.configureGetAssetRequest(MockRestServer.ENCRYPTED_PROTOCOL).build()); - modelRestConfig.add(mockRestServer.configureGetUserRequest(MockRestServer.ENCRYPTED_PROTOCOL).build()); + modelRestConfig.add(mockRestServer.configureModelPostRequest().build()); + modelRestConfig.add(mockRestServer.configureGetAssetRequest().build()); + modelRestConfig.add(mockRestServer.configureGetUserRequest().build()); ObjectWriter ow = getConfigObjectMapper().writerFor(new TypeReference>() { }); ow.writeValue(configFile, modelRestConfig); diff --git a/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/RestRequestTest.java b/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/RestRequestTest.java index 5059902f4..211648c74 100644 --- a/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/RestRequestTest.java +++ b/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/RestRequestTest.java @@ -22,14 +22,8 @@ public abstract class RestRequestTest { protected static MockRestServer mockRestServer; - protected static void startMockServer() { - mockRestServer = new MockRestServer(true); - } - - protected static void stopMockServer() { - if (mockRestServer != null) { - mockRestServer.close(); - } + protected static void startMockServer(boolean enableTlsMutualAuth) { + mockRestServer = new MockRestServer(enableTlsMutualAuth); } protected RestRequestResult makeRequest(RestEnrichmentConfig config, Map variables) throws Exception { @@ -38,7 +32,7 @@ protected RestRequestResult makeRequest(RestEnrichmentConfig config, Map errors = result.getErrors(); Assert.assertEquals(1, errors.size()); diff --git a/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/SecureGetRequestTest.java b/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/SecureGetRequestTest.java new file mode 100644 index 000000000..7bf43a8b8 --- /dev/null +++ b/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/SecureGetRequestTest.java @@ -0,0 +1,30 @@ +/* + * Copyright 2020 - 2022 Cloudera. All Rights Reserved. + * + * This file is licensed under the Apache License Version 2.0 (the "License"). You may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. Refer to the License for the specific permissions and + * limitations governing your use of the file. + */ + +package com.cloudera.cyber.enrichment.rest; + +import org.junit.AfterClass; +import org.junit.BeforeClass; + +public class SecureGetRequestTest extends GetRestRequestTest { + + @BeforeClass + public static void createMockService() { + createMockService(true); + } + + @AfterClass + public static void stopMockServer() { + mockRestServer.close(); + } + +} diff --git a/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/SecurePostRestRequestTest.java b/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/SecurePostRestRequestTest.java new file mode 100644 index 000000000..39f2ac099 --- /dev/null +++ b/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/SecurePostRestRequestTest.java @@ -0,0 +1,63 @@ +/* + * Copyright 2020 - 2022 Cloudera. All Rights Reserved. + * + * This file is licensed under the Apache License Version 2.0 (the "License"). You may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. Refer to the License for the specific permissions and + * limitations governing your use of the file. + */ + +package com.cloudera.cyber.enrichment.rest; + +import com.cloudera.cyber.enrichment.rest.impl.MockRestServer; +import org.hamcrest.Matcher; +import org.junit.*; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.Matchers.containsString; +import static org.junit.Assert.assertThat; + +public class SecurePostRestRequestTest extends PostRestRequestTest { + + private static final String KEY_ALIAS = "client"; + + @BeforeClass + public static void createMockService() { + createMockService(true); + } + + @AfterClass + public static void stopMockServer() { + mockRestServer.close(); + } + + @Test + public void testTlsWithAlias() throws Exception { + RestEnrichmentConfig.RestEnrichmentConfigBuilder modelResultPostRequestWithKeyAlias = mockRestServer.configureTLS(mockRestServer.configureModelPostRequest(), KEY_ALIAS); + testSuccessfulModelPosts(modelResultPostRequestWithKeyAlias.build()); + } + + @Test + public void testTlsHandshakeException() throws Exception { + RestEnrichmentConfig.RestEnrichmentConfigBuilder modelResultPostRequestWithKeyAlias = mockRestServer.configureTLS(mockRestServer.configureModelPostRequest(), "nosuchkeyalias"); + RestRequest badHandshakePost = new PostRestRequest(modelResultPostRequestWithKeyAlias.build()); + MockRestServer.ExpectedModelResult expectedResult = MockRestServer.expectedModelResults.get(0); + Map extensions = new HashMap() {{ + put(MockRestServer.DOMAIN_EXTENSION_NAME, expectedResult.getDomainName()); + }}; + RestRequestResult result = badHandshakePost.getResult(true, extensions).get(); + + Assert.assertTrue(result.getExtensions().isEmpty()); + List errors = result.getErrors(); + Assert.assertEquals(1, errors.size()); + Matcher expectedString = containsString(String.format("Rest request url='%s://%s/model' entity='{\"accessKey\":\"mup8kz1hsl3erczwepbt8jupamita6y6\",\"request\":{\"domain\":\"google\"}}'", mockRestServer.getMockProtocol(), mockRestServer.getMockHostAndPort())); + assertThat(errors.get(0), expectedString); + } + +} \ No newline at end of file diff --git a/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/impl/MockRestServer.java b/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/impl/MockRestServer.java index a22a3473c..f3f4b534f 100644 --- a/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/impl/MockRestServer.java +++ b/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/src/test/java/com/cloudera/cyber/enrichment/rest/impl/MockRestServer.java @@ -12,8 +12,8 @@ package com.cloudera.cyber.enrichment.rest.impl; -import com.cloudera.cyber.enrichment.rest.*; import com.google.common.base.Joiner; +import com.cloudera.cyber.enrichment.rest.*; import com.google.common.collect.Lists; import lombok.AllArgsConstructor; import lombok.Data; @@ -30,10 +30,7 @@ import javax.net.ssl.HttpsURLConnection; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import static org.mockserver.integration.ClientAndServer.startClientAndServer; import static org.mockserver.model.HttpRequest.request; @@ -84,8 +81,6 @@ public class MockRestServer { private static final String BEARER_TOKEN = "mybearertokenaae6b840d045b574d96e35e271419720d0d7645534da6d5ba3d.74c9e8867ef7e0750b5772671acf7e413a744f6d77507eac83584014c71c5866"; private static final int EXPECTED_CLIENT_ERROR_HTTP_CODE = 400; public static final String USER_NAME_PROPERTY = "name"; - public static final String ENCRYPTED_PROTOCOL = "https"; - private ClientAndServer mockServer; private String mockHostAndPort; private String mockProtocol; @@ -106,18 +101,16 @@ public MockRestServer(boolean enableTlsMutualAuth) { initializePostExpectations(); } - public RestEnrichmentConfig.RestEnrichmentConfigBuilder getBuilder(HashMap properties, String protocol) { + public RestEnrichmentConfig.RestEnrichmentConfigBuilder getBuilder(HashMap properties) { RestEnrichmentConfig.RestEnrichmentConfigBuilder builder = RestEnrichmentConfig.builder(). capacity(3); - if (protocol.equals(ENCRYPTED_PROTOCOL)) { - boolean mutualTlsAuth = mockServer.isSecure(); - if (mutualTlsAuth) { - configureTLS(builder, null); - } + boolean mutualTlsAuth = mockServer.isSecure(); + if (mutualTlsAuth) { + configureTLS(builder, null); } if (properties != null) { properties.put("server", mockHostAndPort); - properties.put("protocol", protocol); + properties.put("protocol", mockProtocol); builder.properties(properties); } @@ -134,12 +127,12 @@ public RestEnrichmentConfig.RestEnrichmentConfigBuilder configureTLS(RestEnrichm keyPassword(KEY_PASSWORD).keyAlias(keyAlias).build()); } - public RestEnrichmentConfig.RestEnrichmentConfigBuilder configureModelPostRequest(String protocol) { + public RestEnrichmentConfig.RestEnrichmentConfigBuilder configureModelPostRequest() { HashMap authTokenProperties = new HashMap() {{ put("bearer_token", BEARER_TOKEN); put("access_key", ACCESS_KEY); }}; - return getBuilder(authTokenProperties, protocol). + return getBuilder(authTokenProperties). sources(Lists.newArrayList(DNS_SOURCE)). prefix(DGA_MODEL_PREFIX). endpointTemplate("${protocol}://${server}/model"). @@ -154,20 +147,20 @@ public RestEnrichmentConfig.RestEnrichmentConfigBuilder configureModelPostReques } - public RestEnrichmentConfig.RestEnrichmentConfigBuilder configureGetUserRequest(String protocol) { - return getBuilder(new HashMap<>(), protocol). + public RestEnrichmentConfig.RestEnrichmentConfigBuilder configureGetUserRequest() { + return getBuilder(new HashMap<>()). sources(Lists.newArrayList(USER_SOURCE)). prefix(USER_PREFIX). endpointTemplate("${protocol}://${server}/user?name=${name}"); } - public RestEnrichmentConfig.RestEnrichmentConfigBuilder configureGetAssetRequest(String protocol) { + public RestEnrichmentConfig.RestEnrichmentConfigBuilder configureGetAssetRequest() { HashMap properties = new HashMap() {{ put("user", BASIC_USER_NAME); put("password", BASIC_PASSWORD); }}; - return getBuilder(properties, protocol). + return getBuilder(properties). sources(Lists.newArrayList(ASSET_SOURCE)). prefix(ASSET_PREFIX). endpointTemplate("${protocol}://${server}/asset?id=${id}"). @@ -225,7 +218,7 @@ private void initializePostExpectations() { for (ExpectedModelResult expectedModelResult : expectedModelResults) { addModelResultExpectation(expectedModelResult.isSuccess(), expectedModelResult.getDomainName(), expectedModelResult.isLegit()); } - addModelResultClientErrorExpectation(); + addModelResultClientErrorExpectation(CLIENT_ERROR_DOMAIN, EXPECTED_CLIENT_ERROR_HTTP_CODE); } private void addModelResultExpectation(boolean success, String domain, boolean legit) { @@ -244,18 +237,18 @@ private void addModelResultExpectation(boolean success, String domain, boolean l ); } - private void addModelResultClientErrorExpectation() { + private void addModelResultClientErrorExpectation(String domain, int statusCode) { mockServer.when( request(). withMethod("POST"). withPath("/model"). withContentType(MediaType.APPLICATION_JSON). withHeader(HttpHeaders.AUTHORIZATION, "Bearer ".concat(BEARER_TOKEN)). - withBody(json(String.format("{\"accessKey\":\"%s\",\"request\":{\"domain\":\"%s\"}}", ACCESS_KEY, MockRestServer.CLIENT_ERROR_DOMAIN), MatchType.STRICT)) + withBody(json(String.format("{\"accessKey\":\"%s\",\"request\":{\"domain\":\"%s\"}}", ACCESS_KEY, domain), MatchType.STRICT)) ) .respond( response() - .withStatusCode(MockRestServer.EXPECTED_CLIENT_ERROR_HTTP_CODE) + .withStatusCode(statusCode) ); } From 056e7ea7cd05f123e0aeb858c382083f7bd5003e Mon Sep 17 00:00:00 2001 From: Carolyn Duby Date: Thu, 4 Apr 2024 16:26:33 +0000 Subject: [PATCH 18/19] [CYB-207] Update mockserver dependency to remove conflicts --- .../flink-enrichment/flink-enrichment-lookup-rest/pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/pom.xml b/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/pom.xml index 57f1f57e3..26f45cfaa 100644 --- a/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/pom.xml +++ b/flink-cyber/flink-enrichment/flink-enrichment-lookup-rest/pom.xml @@ -129,8 +129,8 @@ org.mock-server - mockserver-netty - 5.11.1 + mockserver-netty-no-dependencies + RELEASE test From 629a2c1b67f788b4c655d772eeeeaa2ac7950aa5 Mon Sep 17 00:00:00 2001 From: Stas Panasiuk Date: Mon, 29 Apr 2024 17:11:01 +0200 Subject: [PATCH 19/19] typo fix --- flink-cyber/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-cyber/pom.xml b/flink-cyber/pom.xml index 621090ee1..a84ce30cd 100644 --- a/flink-cyber/pom.xml +++ b/flink-cyber/pom.xml @@ -54,7 +54,7 @@ UTF-8 - 1.8 + 1.8 csa1.12.0.0 1.18.0-csa1.12.0.0 1.0-${csa.version}