Skip to content

Commit

Permalink
Merge branch 'main' into feat-ali-oss-dlq
Browse files Browse the repository at this point in the history
  • Loading branch information
ekawinataa authored Dec 6, 2024
2 parents 0f5b23d + 22d0afb commit a653cf4
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 2 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ dependencies {
implementation platform('com.google.cloud:libraries-bom:20.5.0')
implementation 'com.google.cloud:google-cloud-storage:2.20.1'
implementation 'org.apache.logging.log4j:log4j-core:2.20.0'
implementation group: 'com.gotocompany', name: 'depot', version: '0.9.2'
implementation group: 'com.aliyun.oss', name: 'aliyun-sdk-oss', version: '3.18.1'
implementation group: 'com.gotocompany', name: 'depot', version: '0.10.0'
implementation group: 'com.networknt', name: 'json-schema-validator', version: '1.0.59' exclude group: 'org.slf4j'
implementation 'dev.cel:cel:0.5.2'

Expand Down
55 changes: 55 additions & 0 deletions docs/docs/sinks/maxcompute-sink.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# MaxCompute sink

### Datatype Protobuf

MaxCompute sink has several responsibilities, including :

1. Creation of MaxCompute table if it does not exist.
2. Updating the MaxCompute table schema based on the latest protobuf schema.
3. Translating protobuf messages into MaxCompute compatible records and inserting them into MaxCompute tables.

## MaxCompute Table Schema Update

### Protobuf

MaxCompute Sink update the MaxCompute table schema on separate table update operation. MaxCompute
utilise [Stencil](https://github.com/goto/stencil) to parse protobuf messages generate schema and update MaxCompute
tables with the latest schema.
The stencil client periodically reload the descriptor cache. Table schema update happened after the descriptor caches
uploaded.

#### Supported Protobuf - MaxCompute Table Type Mapping

| Protobuf Type | MaxCompute Type |
|------------------------------------------|-------------------------------|
| bytes | BINARY |
| string | STRING |
| enum | STRING |
| float | FLOAT |
| double | DOUBLE |
| bool | BOOLEAN |
| int64, uint64, fixed64, sfixed64, sint64 | BIGINT |
| int32, uint32, fixed32, sfixed32, sint32 | INT |
| message | STRUCT |
| .google.protobuf.Timestamp | TIMESTAMP_NTZ |
| .google.protobuf.Struct | STRING (Json Serialised) |
| .google.protobuf.Duration | STRUCT |
| map<k,v> | ARRAY<STRUCT<key:k, value:v>> |

## Partitioning

MaxCompute Sink supports creation of table with partition configuration. Currently, MaxCompute Sink supports primitive field(STRING, TINYINT, SMALLINT, BIGINT)
and timestamp field based partitioning. Timestamp based partitioning strategy introduces a pseudo-partition column with the value of the timestamp field truncated to the nearest start of day.

## Clustering

MaxCompute Sink currently does not support clustering.

## Metadata

For data quality checking purposes, sometimes some metadata need to be added on the record.
if `SINK_MAXCOMPUTE_ADD_METADATA_ENABLED` is true then the metadata will be added.
`SINK_MAXCOMPUTE_METADATA_NAMESPACE` is used for another namespace to add columns
if namespace is empty, the metadata columns will be added in the root level.
`SINK_MAXCOMPUTE_METADATA_COLUMNS_TYPES` is set with kafka metadata column and their type,
An example of metadata columns that can be added for kafka records.
17 changes: 17 additions & 0 deletions env/local.properties
Original file line number Diff line number Diff line change
Expand Up @@ -205,4 +205,21 @@ SOURCE_KAFKA_CONSUMER_GROUP_ID=sample-group-id
# SINK_REDIS_TTL_TYPE=DISABLE
# SINK_REDIS_TTL_VALUE=0
# SINK_REDIS_DEPLOYMENT_TYPE=Standalone
#############################################
#
## MaxCompute Sink
#
SINK_MAXCOMPUTE_ODPS_URL=http://service.ap-southeast-5.maxcompute.aliyun.com/api
SINK_MAXCOMPUTE_TUNNEL_URL=http://dt.ap-southeast-5.maxcompute.aliyun.com
SINK_MAXCOMPUTE_ACCESS_ID=
SINK_MAXCOMPUTE_ACCESS_KEY=
SINK_MAXCOMPUTE_PROJECT_ID=your_project_id
SINK_MAXCOMPUTE_SCHEMA=default
SINK_MAXCOMPUTE_METADATA_NAMESPACE=__kafka_metadata
SINK_MAXCOMPUTE_ADD_METADATA_ENABLED=true
SINK_MAXCOMPUTE_METADATA_COLUMNS_TYPES=message_timestamp=timestamp,message_topic=string,message_partition=integer,message_offset=long
SINK_MAXCOMPUTE_TABLE_PARTITIONING_ENABLE=true
SINK_MAXCOMPUTE_TABLE_PARTITION_KEY=event_timestamp
SINK_MAXCOMPUTE_TABLE_PARTITION_COLUMN_NAME=__partition_key
SINK_MAXCOMPUTE_TABLE_NAME=table_name

Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,6 @@ public enum SinkType {
BLOB,
BIGQUERY,
BIGTABLE,
MONGODB
MONGODB,
MAXCOMPUTE
}
9 changes: 9 additions & 0 deletions src/main/java/com/gotocompany/firehose/sink/SinkFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import com.gotocompany.depot.http.HttpSink;
import com.gotocompany.depot.log.LogSink;
import com.gotocompany.depot.log.LogSinkFactory;
import com.gotocompany.depot.maxcompute.MaxComputeSink;
import com.gotocompany.depot.maxcompute.MaxComputeSinkFactory;
import com.gotocompany.depot.metrics.StatsDReporter;
import com.gotocompany.depot.redis.RedisSink;
import com.gotocompany.depot.redis.RedisSinkFactory;
Expand Down Expand Up @@ -46,6 +48,7 @@ public class SinkFactory {
private LogSinkFactory logSinkFactory;
private RedisSinkFactory redisSinkFactory;
private com.gotocompany.depot.http.HttpSinkFactory httpv2SinkFactory;
private MaxComputeSinkFactory maxComputeSinkFactory;

public SinkFactory(KafkaConsumerConfig kafkaConsumerConfig,
StatsDReporter statsDReporter,
Expand Down Expand Up @@ -104,6 +107,10 @@ public void init() {
statsDReporter);
httpv2SinkFactory.init();
return;
case MAXCOMPUTE:
maxComputeSinkFactory = new MaxComputeSinkFactory(statsDReporter, stencilClient, config);
maxComputeSinkFactory.init();
return;
default:
throw new ConfigurationException("Invalid Firehose SINK_TYPE");
}
Expand Down Expand Up @@ -139,6 +146,8 @@ public Sink getSink() {
return MongoSinkFactory.create(config, statsDReporter, stencilClient);
case HTTPV2:
return new GenericSink(new FirehoseInstrumentation(statsDReporter, HttpSink.class), sinkType.name(), httpv2SinkFactory.create());
case MAXCOMPUTE:
return new GenericSink(new FirehoseInstrumentation(statsDReporter, MaxComputeSink.class), sinkType.name(), maxComputeSinkFactory.create());
default:
throw new ConfigurationException("Invalid Firehose SINK_TYPE");
}
Expand Down

0 comments on commit a653cf4

Please sign in to comment.