diff --git a/build.gradle b/build.gradle index 4b7e6e4f4..416b77683 100644 --- a/build.gradle +++ b/build.gradle @@ -100,8 +100,8 @@ dependencies { implementation platform('com.google.cloud:libraries-bom:20.5.0') implementation 'com.google.cloud:google-cloud-storage:2.20.1' implementation 'org.apache.logging.log4j:log4j-core:2.20.0' - implementation group: 'com.gotocompany', name: 'depot', version: '0.9.2' implementation group: 'com.aliyun.oss', name: 'aliyun-sdk-oss', version: '3.18.1' + implementation group: 'com.gotocompany', name: 'depot', version: '0.10.0' implementation group: 'com.networknt', name: 'json-schema-validator', version: '1.0.59' exclude group: 'org.slf4j' implementation 'dev.cel:cel:0.5.2' diff --git a/docs/docs/sinks/maxcompute-sink.md b/docs/docs/sinks/maxcompute-sink.md new file mode 100644 index 000000000..b30482c68 --- /dev/null +++ b/docs/docs/sinks/maxcompute-sink.md @@ -0,0 +1,55 @@ +# MaxCompute sink + +### Datatype Protobuf + +MaxCompute sink has several responsibilities, including : + +1. Creation of MaxCompute table if it does not exist. +2. Updating the MaxCompute table schema based on the latest protobuf schema. +3. Translating protobuf messages into MaxCompute compatible records and inserting them into MaxCompute tables. + +## MaxCompute Table Schema Update + +### Protobuf + +MaxCompute Sink update the MaxCompute table schema on separate table update operation. MaxCompute +utilise [Stencil](https://github.com/goto/stencil) to parse protobuf messages generate schema and update MaxCompute +tables with the latest schema. +The stencil client periodically reload the descriptor cache. Table schema update happened after the descriptor caches +uploaded. + +#### Supported Protobuf - MaxCompute Table Type Mapping + +| Protobuf Type | MaxCompute Type | +|------------------------------------------|-------------------------------| +| bytes | BINARY | +| string | STRING | +| enum | STRING | +| float | FLOAT | +| double | DOUBLE | +| bool | BOOLEAN | +| int64, uint64, fixed64, sfixed64, sint64 | BIGINT | +| int32, uint32, fixed32, sfixed32, sint32 | INT | +| message | STRUCT | +| .google.protobuf.Timestamp | TIMESTAMP_NTZ | +| .google.protobuf.Struct | STRING (Json Serialised) | +| .google.protobuf.Duration | STRUCT | +| map | ARRAY> | + +## Partitioning + +MaxCompute Sink supports creation of table with partition configuration. Currently, MaxCompute Sink supports primitive field(STRING, TINYINT, SMALLINT, BIGINT) +and timestamp field based partitioning. Timestamp based partitioning strategy introduces a pseudo-partition column with the value of the timestamp field truncated to the nearest start of day. + +## Clustering + +MaxCompute Sink currently does not support clustering. + +## Metadata + +For data quality checking purposes, sometimes some metadata need to be added on the record. +if `SINK_MAXCOMPUTE_ADD_METADATA_ENABLED` is true then the metadata will be added. +`SINK_MAXCOMPUTE_METADATA_NAMESPACE` is used for another namespace to add columns +if namespace is empty, the metadata columns will be added in the root level. +`SINK_MAXCOMPUTE_METADATA_COLUMNS_TYPES` is set with kafka metadata column and their type, +An example of metadata columns that can be added for kafka records. diff --git a/env/local.properties b/env/local.properties index 878dc063c..928218bdf 100644 --- a/env/local.properties +++ b/env/local.properties @@ -205,4 +205,21 @@ SOURCE_KAFKA_CONSUMER_GROUP_ID=sample-group-id # SINK_REDIS_TTL_TYPE=DISABLE # SINK_REDIS_TTL_VALUE=0 # SINK_REDIS_DEPLOYMENT_TYPE=Standalone +############################################# +# +## MaxCompute Sink +# +SINK_MAXCOMPUTE_ODPS_URL=http://service.ap-southeast-5.maxcompute.aliyun.com/api +SINK_MAXCOMPUTE_TUNNEL_URL=http://dt.ap-southeast-5.maxcompute.aliyun.com +SINK_MAXCOMPUTE_ACCESS_ID= +SINK_MAXCOMPUTE_ACCESS_KEY= +SINK_MAXCOMPUTE_PROJECT_ID=your_project_id +SINK_MAXCOMPUTE_SCHEMA=default +SINK_MAXCOMPUTE_METADATA_NAMESPACE=__kafka_metadata +SINK_MAXCOMPUTE_ADD_METADATA_ENABLED=true +SINK_MAXCOMPUTE_METADATA_COLUMNS_TYPES=message_timestamp=timestamp,message_topic=string,message_partition=integer,message_offset=long +SINK_MAXCOMPUTE_TABLE_PARTITIONING_ENABLE=true +SINK_MAXCOMPUTE_TABLE_PARTITION_KEY=event_timestamp +SINK_MAXCOMPUTE_TABLE_PARTITION_COLUMN_NAME=__partition_key +SINK_MAXCOMPUTE_TABLE_NAME=table_name diff --git a/src/main/java/com/gotocompany/firehose/config/enums/SinkType.java b/src/main/java/com/gotocompany/firehose/config/enums/SinkType.java index bbc93e095..5e4676b67 100644 --- a/src/main/java/com/gotocompany/firehose/config/enums/SinkType.java +++ b/src/main/java/com/gotocompany/firehose/config/enums/SinkType.java @@ -19,5 +19,6 @@ public enum SinkType { BLOB, BIGQUERY, BIGTABLE, - MONGODB + MONGODB, + MAXCOMPUTE } diff --git a/src/main/java/com/gotocompany/firehose/sink/SinkFactory.java b/src/main/java/com/gotocompany/firehose/sink/SinkFactory.java index 9305e57a1..5003dc697 100644 --- a/src/main/java/com/gotocompany/firehose/sink/SinkFactory.java +++ b/src/main/java/com/gotocompany/firehose/sink/SinkFactory.java @@ -11,6 +11,8 @@ import com.gotocompany.depot.http.HttpSink; import com.gotocompany.depot.log.LogSink; import com.gotocompany.depot.log.LogSinkFactory; +import com.gotocompany.depot.maxcompute.MaxComputeSink; +import com.gotocompany.depot.maxcompute.MaxComputeSinkFactory; import com.gotocompany.depot.metrics.StatsDReporter; import com.gotocompany.depot.redis.RedisSink; import com.gotocompany.depot.redis.RedisSinkFactory; @@ -46,6 +48,7 @@ public class SinkFactory { private LogSinkFactory logSinkFactory; private RedisSinkFactory redisSinkFactory; private com.gotocompany.depot.http.HttpSinkFactory httpv2SinkFactory; + private MaxComputeSinkFactory maxComputeSinkFactory; public SinkFactory(KafkaConsumerConfig kafkaConsumerConfig, StatsDReporter statsDReporter, @@ -104,6 +107,10 @@ public void init() { statsDReporter); httpv2SinkFactory.init(); return; + case MAXCOMPUTE: + maxComputeSinkFactory = new MaxComputeSinkFactory(statsDReporter, stencilClient, config); + maxComputeSinkFactory.init(); + return; default: throw new ConfigurationException("Invalid Firehose SINK_TYPE"); } @@ -139,6 +146,8 @@ public Sink getSink() { return MongoSinkFactory.create(config, statsDReporter, stencilClient); case HTTPV2: return new GenericSink(new FirehoseInstrumentation(statsDReporter, HttpSink.class), sinkType.name(), httpv2SinkFactory.create()); + case MAXCOMPUTE: + return new GenericSink(new FirehoseInstrumentation(statsDReporter, MaxComputeSink.class), sinkType.name(), maxComputeSinkFactory.create()); default: throw new ConfigurationException("Invalid Firehose SINK_TYPE"); }