Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CYB-213] Codestyle fixes + linter added #80

Open
wants to merge 20 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
4 changes: 3 additions & 1 deletion .github/workflows/build_and_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ jobs:
key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }}
restore-keys: |
${{ runner.os }}-maven-
- name: Check codestyle
run: mvn -B validate --file flink-cyber/pom.xml
- name: Build and Test with Maven
run: mvn -P '!add-dependencies-for-IDEA,!full-build,!include-front-end' -B package --file flink-cyber/pom.xml
run: mvn -P '!add-dependencies-for-IDEA,!full-build,!include-front-end' -B verify --file flink-cyber/pom.xml

# Optional: Uploads the full dependency graph to GitHub to improve the quality of Dependabot alerts this repository can receive
- name: Update dependency graph
Expand Down
1,460 changes: 1,460 additions & 0 deletions flink-cyber/.editorconfig

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@
import com.cloudera.cyber.libs.networking.IPLocal;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
Expand All @@ -30,21 +37,6 @@
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Preconditions;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

public abstract class CaracalGeneratorFlinkJob {

public static final String PARAMS_RECORDS_LIMIT = "generator.count";
Expand All @@ -61,7 +53,9 @@ public abstract class CaracalGeneratorFlinkJob {
protected static final String DPI_SMTP_TOPIC = "dpi_smtp";
protected static final String GENERATOR_AVRO_TOPIC = "generator.avro";
protected static final String THREAT_TOPIC_NAME = "threats";
protected static final String AVRO_WITH_CUSTOM_CONFIG_ERROR = String.format("'%s' should not be specified when '%s' is true. Select either a custom generation config file or generate the default avro.", PARAMS_GENERATOR_CONFIG, PARAMS_SCHEMA);
protected static final String AVRO_WITH_CUSTOM_CONFIG_ERROR = String.format(
"'%s' should not be specified when '%s' is true. Select either a custom generation config file or generate the default avro.",
PARAMS_GENERATOR_CONFIG, PARAMS_SCHEMA);

public StreamExecutionEnvironment createPipeline(ParameterTool params) throws IOException {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Expand All @@ -84,19 +78,20 @@ private GeneratorConfig getGeneratorConfig(ParameterTool params) throws IOExcept
GeneratorConfig generatorConfig = new GeneratorConfig();

if (avroGeneratorFlag && generatorConfigFile == null) {
generatorConfig.setGenerationSources(Collections
.singletonList(
new GenerationSource("Netflow/netflow_avro_sample1.json", GENERATOR_AVRO_TOPIC, SCHEMA_PATH, 1.0)));
generatorConfig.setGenerationSources(Collections
.singletonList(
new GenerationSource("Netflow/netflow_avro_sample1.json", GENERATOR_AVRO_TOPIC, SCHEMA_PATH,
1.0)));
} else if (generatorConfigFile == null) {
generatorConfig.setGenerationSources(getNetflowSampleMap());
} else {
Preconditions.checkState(!avroGeneratorFlag, AVRO_WITH_CUSTOM_CONFIG_ERROR);
Path configPath = new Path(generatorConfigFile);
try (InputStream configStream = configPath.getFileSystem().open(configPath)) {
generatorConfig = new ObjectMapper().readValue(
configStream,
new TypeReference<GeneratorConfig>() {
});
configStream,
new TypeReference<GeneratorConfig>() {
});
}
}
generatorConfig.open();
Expand All @@ -105,33 +100,36 @@ private GeneratorConfig getGeneratorConfig(ParameterTool params) throws IOExcept
}

private SingleOutputStreamOperator<Tuple2<String, Integer>> generateMetrics(
SingleOutputStreamOperator<Tuple2<String, byte[]>> generatedInput) {
SingleOutputStreamOperator<Tuple2<String, byte[]>> generatedInput) {
return generatedInput
.map(new MapFunction<Tuple2<String, byte[]>, Tuple2<String,Integer>>() {
@Override
public Tuple2<String, Integer> map(Tuple2<String, byte[]> stringStringTuple2) {
return Tuple2.of(stringStringTuple2.f0, 1);
}
})
.keyBy(value -> value.f0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(30)))
.sum(1);
.map(new MapFunction<Tuple2<String, byte[]>, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(Tuple2<String, byte[]> stringStringTuple2) {
return Tuple2.of(stringStringTuple2.f0, 1);
}
})
.keyBy(value -> value.f0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(30)))
.sum(1);
}

private void generateRandomThreatResults(ParameterTool params,
SingleOutputStreamOperator<Tuple2<String, byte[]>> generatedInput, GeneratorConfig generatorConfig) {
SingleOutputStreamOperator<Tuple2<String, byte[]>> generatedInput,
GeneratorConfig generatorConfig) {

if (params.get(PARAMS_GENERATOR_CONFIG) == null) {
// add random threat intelligence for a sample of the generated IPs
IPLocal localIp = new IPLocal();

List<String> characterTopics = generatorConfig.getGenerationSources().stream().filter(gs -> Objects.isNull(gs.getOutputAvroSchema())).
map(GenerationSource::getTopic).collect(Collectors.toList());
SingleOutputStreamOperator<Tuple2<String, byte[]>> threats = generatedInput.filter(t -> characterTopics.contains(t.f0))
.map(new GetIpMap())
.filter(f -> f != null && !localIp.eval(f))
.filter(new RandomSampler<>(THREAT_PROBABILITY))
.map(new ThreatGeneratorMap(THREAT_TOPIC_NAME));
List<String> characterTopics = generatorConfig.getGenerationSources().stream()
.filter(gs -> Objects.isNull(gs.getOutputAvroSchema()))
.map(GenerationSource::getTopic).collect(Collectors.toList());
SingleOutputStreamOperator<Tuple2<String, byte[]>> threats =
generatedInput.filter(t -> characterTopics.contains(t.f0))
.map(new GetIpMap())
.filter(f -> f != null && !localIp.eval(f))
.filter(new RandomSampler<>(THREAT_PROBABILITY))
.map(new ThreatGeneratorMap(THREAT_TOPIC_NAME));
writeResults(params, threats);
}
}
Expand All @@ -145,30 +143,36 @@ private List<GenerationSource> getNetflowSampleMap() {
outputs.add(new GenerationSource("Netflow/netflow_sample_b.json", NETFLOW_B_TOPIC, null, 1.0));
outputs.add(new GenerationSource("Netflow/netflow_sample_b_error.json", NETFLOW_B_TOPIC, null, 1.0));

outputs.add(new GenerationSource("DPI_Logs/Metadata_Module/http/http_sample_1.json", DPI_HTTP_TOPIC, null, 1.5));
outputs.add(new GenerationSource("DPI_Logs/Metadata_Module/http/http_sample_2.json", DPI_HTTP_TOPIC, null, 1.0));
outputs.add(new GenerationSource("DPI_Logs/Metadata_Module/http/http_sample_3.json", DPI_HTTP_TOPIC, null, 1.0));
outputs.add(new GenerationSource("DPI_Logs/Metadata_Module/http/http_sample_4.json", DPI_HTTP_TOPIC, null, 1.0));
outputs.add(
new GenerationSource("DPI_Logs/Metadata_Module/http/http_sample_1.json", DPI_HTTP_TOPIC, null, 1.5));
outputs.add(
new GenerationSource("DPI_Logs/Metadata_Module/http/http_sample_2.json", DPI_HTTP_TOPIC, null, 1.0));
outputs.add(
new GenerationSource("DPI_Logs/Metadata_Module/http/http_sample_3.json", DPI_HTTP_TOPIC, null, 1.0));
outputs.add(
new GenerationSource("DPI_Logs/Metadata_Module/http/http_sample_4.json", DPI_HTTP_TOPIC, null, 1.0));

outputs.add(new GenerationSource("DPI_Logs/Metadata_Module/DNS/dns_sample_1.json", DPI_DNS_TOPIC, null, 1.0));
outputs.add(new GenerationSource("DPI_Logs/Metadata_Module/DNS/dns_sample_2.json", DPI_DNS_TOPIC, null, 1.0));
outputs.add(new GenerationSource("DPI_Logs/Metadata_Module/DNS/dns_sample_3.json", DPI_DNS_TOPIC, null, 1.0));

outputs.add(new GenerationSource("DPI_Logs/Metadata_Module/SMTP/smtp_sample_1.json", DPI_SMTP_TOPIC, null, 1.0));
outputs.add(
new GenerationSource("DPI_Logs/Metadata_Module/SMTP/smtp_sample_1.json", DPI_SMTP_TOPIC, null, 1.0));
return outputs;
}

private SingleOutputStreamOperator<Tuple2<String, byte[]>> createSourceFromTemplateSource(ParameterTool params,
StreamExecutionEnvironment env, GeneratorConfig outputs) {
StreamExecutionEnvironment env,
GeneratorConfig outputs) {
return env.addSource(new FreemarkerTemplateSource(
outputs, params.getLong(PARAMS_RECORDS_LIMIT, -1), params.getInt(PARAMS_EPS, DEFAULT_EPS)))
.name("Weighted Data Source");
outputs, params.getLong(PARAMS_RECORDS_LIMIT, -1), params.getInt(PARAMS_EPS, DEFAULT_EPS)))
.name("Weighted Data Source");
}

protected abstract void writeMetrics(ParameterTool params,
SingleOutputStreamOperator<Tuple2<String, Integer>> metrics);
SingleOutputStreamOperator<Tuple2<String, Integer>> metrics);

protected abstract void writeResults(ParameterTool params,
SingleOutputStreamOperator<Tuple2<String, byte[]>> generatedInput);
SingleOutputStreamOperator<Tuple2<String, byte[]>> generatedInput);

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@

package com.cloudera.cyber.test.generator;

import static com.cloudera.cyber.flink.Utils.readKafkaProperties;

import com.cloudera.cyber.flink.FlinkUtils;
import java.nio.charset.StandardCharsets;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.connector.base.DeliveryGuarantee;
Expand All @@ -21,11 +24,6 @@
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.util.Preconditions;
import org.apache.kafka.clients.producer.ProducerRecord;
import static com.cloudera.cyber.flink.Utils.readKafkaProperties;

import java.nio.charset.StandardCharsets;

import java.nio.charset.StandardCharsets;

public class CaracalGeneratorFlinkJobKafka extends CaracalGeneratorFlinkJob {

Expand All @@ -35,34 +33,42 @@ public static void main(String[] args) throws Exception {
Preconditions.checkArgument(args.length >= 1, "Arguments must consist of a properties files");
ParameterTool params = com.cloudera.cyber.flink.Utils.getParamToolsFromProperties(args);
FlinkUtils.executeEnv(new CaracalGeneratorFlinkJobKafka()
.createPipeline(params), "Caracal Data generator", params);
.createPipeline(params), "Caracal Data generator", params);
}

@Override
protected void writeMetrics(ParameterTool params, SingleOutputStreamOperator<Tuple2<String, Integer>> metrics) {
KafkaSink<Tuple2<String, Integer>> metricsSink = KafkaSink.<Tuple2<String, Integer>>builder().setRecordSerializer(
(KafkaRecordSerializationSchema<Tuple2<String, Integer>>) (stringIntegerTuple2, kafkaSinkContext, timestamp) -> new ProducerRecord<>(
params.get("generator.metrics", "generator.metrics"),
null,
timestamp,
stringIntegerTuple2.f0.getBytes(StandardCharsets.UTF_8),
stringIntegerTuple2.f1.toString().getBytes(StandardCharsets.UTF_8)
)).setKafkaProducerConfig(
readKafkaProperties(params, PRODUCER_ID_PREFIX.concat("generator.metrics"), false)).setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).build();
KafkaSink<Tuple2<String, Integer>> metricsSink =
KafkaSink.<Tuple2<String, Integer>>builder().setRecordSerializer(
(KafkaRecordSerializationSchema<Tuple2<String, Integer>>)
(stringIntegerTuple2, kafkaSinkContext, timestamp) -> new ProducerRecord<>(
params.get("generator.metrics", "generator.metrics"),
null,
timestamp,
stringIntegerTuple2.f0.getBytes(StandardCharsets.UTF_8),
stringIntegerTuple2.f1.toString().getBytes(StandardCharsets.UTF_8)
)).setKafkaProducerConfig(
readKafkaProperties(params, PRODUCER_ID_PREFIX.concat("generator.metrics"), false))
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).build();
metrics.sinkTo(metricsSink).name("Metrics Sink");
}

@Override
protected void writeResults(ParameterTool params,
SingleOutputStreamOperator<Tuple2<String, byte[]>> generatedInput) {
KafkaSink<Tuple2<String, byte[]>> kafkaSink = KafkaSink.<Tuple2<String, byte[]>>builder().setRecordSerializer(
(KafkaRecordSerializationSchema<Tuple2<String, byte[]>>) (stringStringTuple2, kafkaSinkContext, aLong) -> new ProducerRecord<>(
stringStringTuple2.f0,
stringStringTuple2.f1
)).
setKafkaProducerConfig(readKafkaProperties(params, PRODUCER_ID_PREFIX.concat("generator.output"), false)).
setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).
build();
SingleOutputStreamOperator<Tuple2<String, byte[]>> generatedInput) {
KafkaSink<Tuple2<String, byte[]>> kafkaSink =
KafkaSink.<Tuple2<String, byte[]>>builder()
.setRecordSerializer(
(KafkaRecordSerializationSchema<Tuple2<String, byte[]>>)
(stringStringTuple2, kafkaSinkContext, longValue) -> new ProducerRecord<>(
stringStringTuple2.f0,
stringStringTuple2.f1
))
.setKafkaProducerConfig(readKafkaProperties(params,
PRODUCER_ID_PREFIX.concat("generator.output"),
false))
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();

generatedInput.sinkTo(kafkaSink).name("Text Generator Sink");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,16 @@

package com.cloudera.cyber.test.generator;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;

import java.nio.charset.Charset;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;

public class GetIpMap implements MapFunction<Tuple2<String, byte[]>, String> {

private static final Pattern ipExtractPattern = Pattern.compile("([0-9]{1,3}\\.[0-9]{1,3}\\.[0-9]{1,3}\\.[0-9]{1,3})");
private static final Pattern ipExtractPattern =
Pattern.compile("([0-9]{1,3}\\.[0-9]{1,3}\\.[0-9]{1,3}\\.[0-9]{1,3})");

@Override
public String map(Tuple2<String, byte[]> s) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@

package com.cloudera.cyber.test.generator;

import org.apache.flink.api.common.functions.FilterFunction;

import java.util.concurrent.ThreadLocalRandom;
import org.apache.flink.api.common.functions.FilterFunction;

public class RandomSampler<T> implements FilterFunction<T> {
private double threatProbability;
private final double threatProbability;

public RandomSampler(double threatProbability) {
this.threatProbability = threatProbability;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,9 @@
# either express or implied. Refer to the License for the specific permissions and
# limitations governing your use of the file.
#

appender.console.type = Console
appender.console.name = STDOUT
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n

rootLogger.level = warn
rootLogger.appenderRef.stdout.ref = STDOUT
appender.console.type=Console
appender.console.name=STDOUT
appender.console.layout.type=PatternLayout
appender.console.layout.pattern=%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n
rootLogger.level=warn
rootLogger.appenderRef.stdout.ref=STDOUT
Loading
Loading