Skip to content

Commit

Permalink
feat: Max Compute Sink (#52)
Browse files Browse the repository at this point in the history
* feat: Add Type Info Converter Implementation

* feat: Add Test

* feat: Complete test for MessageTypeInfoConverterTest

* feat: Complete test for TimestampTypeInfoConverter

* feat: Complete test for StructTypeInfoConverter.java

* feat: Complete test for DurationTypeInfoConverter.java

* feat: Complete test for BaseTypeInfoConverterTest.java

* feat: get implementation diff from feat branch

* test: update ConverterOrchestratorTest

* test: Add InsertManagerFactoryTest

* chore: remove unused constructor

* test:
- add DefaultPartitioningStrategyTest
- add TimestampPartitioningStrategyTest
- add PartitioningStrategyFactoryTest

* fix: PartitioningStrategyFactoryTest

* fix: remove unused dependency injection

* test: add non null injected decorator

* test: add test for MaxComputeSchemaCache

* chore: remove unused lombok annotations

* test: Add MaxComputeSinkTest

* test: add test for close

* test: fix typeinfo

* fix: add error handler in MaxComputeSink

* add ProtoUnknownFieldValidationType

* test: Add test for ProtoUnknownFieldValidationType

* fix: use correct converter

* fix: rename method

* chore: rename

* chore: exclude conflicting deps

* chore: fix checkstyle main

* chore: fix checkstyle test

* chore: fix dependencies issue

* chore: remove main

* fix: fix test

* chore: exclude model package and bump version

* feat: add max compute metrics

* feat: add compression option

* fix: fix ordering on instantiation

* feat: Add utils and implement schema update

* feat: Add logging when executing SQL

* chore: checkstyle main

* chore: fix wrong params

* feat: add converter for MaxComputeCompressionAlgorithmConverter

* chore: fix checkstyle

* fix: byte array conversion

* test: fix test

* fix: remove record reordering, put metadata at the beginning

* fix: use timestamp_ntz for proto timestamp and partition

* fix: adjust unit test to use timestamp_ntz instead of timestamp

* chore: remove unused lombok annotations

* chore: exclude maxcompute client and factory from coverage

* chore: exclude maxcompute client from coverage check

* chore: delete unused proto

* test: add test case for log key

* chore: remove unused getters

* test: add compression option test

* chore: remove redundant column field

* fix: duration converter cast to message

* feat: add partition precondition on MaxComputeClient

* feat: add allowSchemaMismatch to false

* chore: remove unused config

* chore: adjust config name

* chore: add sink and config docs

* chore: docs layout

* chore: docs layout + adjust default value of config

* fix: revert mistakenly pushed build gradle change

* chore: fix magic constant

* fix: set null instead of default value to nonexistent field

* chore: fix partitioning strategy

* test: complete unit test

* feat: add configurable date format for timestamp partition key

* chore: change default value and docs

* Maxcompute instrumentation (#55)

* added instrumentation for MC sink

* chore: refactor InsertManager to abstract class

* chore: revert spacing change

* chore: refactor

---------

Co-authored-by: Eka Winata <[email protected]>

* feat: add zone offset for timestamp

* feat: use zone offset config for metadata util

* feat: Use ZoneId instead of ZoneOffset

* feat: use ali auto partitioning strategy [todo fix ProtoDataColumnRecordDecoratorTest]

* test: fix ProtoDataColumnRecordDecoratorTest

* chore: remove unused config

* fix: set MaxComputeCache when partitioningStrategy is not null

* chore: checkstyle

* feat: add checking on sql result

* fix: metadata util

* chore: checkstyle

* fix: add validation for enum based config and make it fail fast on initialization

* chore: fix exception type

* chore: checkstyle

* fix: class naming and add validation for enum value

* chore: Fix MetadataUtil to accept List of TupleString

* chore: switch metadata ordering

* - refactor schema cache to factory method
- fix schema cache to fetch from metadata server

* Fix MaxComputeSchemaCache and its test

* chore: if not exists true table creation parameter

* - checkstyle SinkConfigUtils
- add test for SinkConfigUtils

* - add retry utils
- separate DDL operation to DdlManager

* test: Add test for RetryUtils.java

* feat: add global configs as externalized parameter

* feat: refactor streaming management to StreamingSessionManager class

* feat: externalize partition time unit configuration

* fix: refactor TimestampPartitioningStrategy

* test: add case when passed object is not Record

* chore: remove null checking since message couldn't be null

* chore: add synchronization on schema update method

* chore: checkstyle main

* chore: checkstyle main

* chore: update docs

* chore: update schema docs

* chore: reorder annotation in config

* chore: change version to 0.10.0

* chore: bump aliyun version

* chore: remove redundant enum converter class

* feat: use guava cache for streaming session

* chore: use sessionCache.getUnchecked

* test: update test

* chore: checkstyle

* chore: use const instead of literal string

* chore: wrap update statement with backtick

* test: add IOException use case

* chore: refactor RetryUtils to receive exception predicate

* chore: remove validateConfig()

* fix: make RecordWrapper immutable

* chore: optimize variable declaration on MaxComputeOdpsGlobalSettingsConverter

* fix: remove unused TableTunnel dependencies from InsertManager.java and its implementation

* fix: setup FlushOption on the constructor for InsertManager

* fix: rename static factory method of streaming session manager

* chore: add javadoc for PayloadConverter

* chore: refactor StreamingSessionManager to receive LoadingCache in its constructor

* chore: Refactor PrimitiveTypeInfoConverter to use ImmutableMap

* chore: Rename ConverterOrchestrator.java to ProtobufConverterOrchestrator.java

* chore: refactor name and type info to static var

* chore: move MaxComputeSchemaHelper to root MaxCompute package

* chore: rename getDdlDeclaration to getDDL

* chore: remove builder on MaxComputeSchema

* chore: fix indent

* refactor: MaxComputeSchemaCache to separate handling between non-null and null descriptor

* refactor: use google Sets on PartitioningStrategyFactory

* refactor: make TableValidator limit configurable

* refactor: use immutable map on MetadataUtil

* refactor: remove SinkConfigUtils.java and its test. Transfer the implementation of util method directly in class using it

* chore: add static imports for assertions lib

* chore: rename PayloadConverter.java to ProtobufPayloadConverter

* feat: add validation for timestamp type

* chore: update docs and default value for valid min and max timestamp

* refactor: wrap payload converter to work with DTO

* chore: set default maximum session count to 2 for SINK_MAXCOMPUTE_STREAMING_INSERT_MAXIMUM_SESSION_COUNT

* test: fix assertion

* feat: add timestamp difference validation

* feat: Add NaN and Infinite check float and double

* chore: rename upsertTable to createOrUpdateTable

* chore: refactor inline lambda to static method

* chore: update docs

* fix: use server side schema for building metadata

* chore: add docs for SINK_CONNECTOR_SCHEMA_PROTO_UNKNOWN_FIELDS_VALIDATION

* chore: rename MaxComputeSchemaHelper.java to MaxComputeSchemaBuilder

* refactor: Add method to add uniform errors in SinkResponse and apply it to MaxComputeSink

* feat: parameterized SINK_MAXCOMPUTE_STREAMING_INSERT_TUNNEL_SLOT_COUNT_PER_SESSION

* chore: update generic.md to use more explicit context

* chore: update maxcompute.md to show the correct type mapping

* chore: rename object to parsedObject in ProtoPayload.java

* chore: refactor `convert` method under ProtobufConverterOrchestrator to toMaxComputeTypeInfo and toMaxComputeValue

* chore: add javadoc for ProtobufTypeInfoConverter.java

* chore: checkstyle

* chore: add static import

* chore: remove extraneous space

---------

Co-authored-by: Vaishnavi190900 <[email protected]>
  • Loading branch information
ekawinataa and Vaishnavi190900 authored Dec 6, 2024
1 parent ef469aa commit 707682c
Show file tree
Hide file tree
Showing 101 changed files with 7,527 additions and 14 deletions.
15 changes: 13 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ plugins {
}

group 'com.gotocompany'
version '0.9.2'
version '0.10.0'

repositories {
mavenLocal()
mavenCentral()
}

configurations.configureEach { exclude group: 'com.google.guava', module: 'listenablefuture' }

dependencies {
implementation group: 'com.google.protobuf', name: 'protobuf-java', version: '3.25.0'
implementation group: 'com.datadoghq', name: 'java-dogstatsd-client', version: '2.13.0'
Expand All @@ -44,6 +46,10 @@ dependencies {
implementation(group: 'com.google.cloud', name: 'google-cloud-bigtable', version: '2.24.1') {
exclude group: "io.grpc"
}
implementation (group: 'com.aliyun.odps', name: 'odps-sdk-core', version: '0.51.0-public.rc1') {
exclude group: "com.google"
exclude group: "io.grpc"
}
implementation 'io.grpc:grpc-all:1.55.1'
implementation group: 'org.slf4j', name: 'jul-to-slf4j', version: '1.7.35'
implementation group: 'redis.clients', name: 'jedis', version: '3.10.0'
Expand All @@ -53,7 +59,9 @@ dependencies {
implementation group: 'com.jayway.jsonpath', name: 'json-path', version: '2.8.0'
implementation group: 'com.squareup.okhttp3', name: 'okhttp', version: '4.9.1'
implementation group: 'joda-time', name: 'joda-time', version: '2.10.2'
implementation('com.google.guava:guava:32.0.1-jre') { force = true }
testImplementation group: 'junit', name: 'junit', version: '4.13.1'
testImplementation group: 'org.assertj', name: 'assertj-core', version: '3.26.3'
testImplementation 'org.hamcrest:hamcrest-all:1.3'
testImplementation 'org.mockito:mockito-core:4.5.1'
testImplementation 'com.github.tomakehurst:wiremock:2.16.0'
Expand Down Expand Up @@ -203,7 +211,10 @@ jacocoTestCoverageVerification {
'**/serializer/**',
'**/cortexpb/**',
'**/Clock**',
'**/GoGoProtos**',])
'**/GoGoProtos**',
'**/MaxComputeClient**',
'**/MaxComputeSinkFactory**',
''])
})
}
violationRules {
Expand Down
53 changes: 52 additions & 1 deletion docs/reference/configuration/generic.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,57 @@ Allow unknown fields in proto schema
* Type: `required`
* Default: `false`

## `SINK_CONNECTOR_SCHEMA_PROTO_UNKNOWN_FIELDS_VALIDATION`

This configuration is used in conjunction with `SINK_CONNECTOR_SCHEMA_PROTO_ALLOW_UNKNOWN_FIELDS_ENABLE`. If `SINK_CONNECTOR_SCHEMA_PROTO_ALLOW_UNKNOWN_FIELDS_ENABLE` is set to `true`, then this configuration is used to validate the unknown fields in the proto message.
Supported values are `MESSAGE`, `MESSAGE_ARRAY_FIRST_INDEX`, `MESSAGE_ARRAY_FULL`. This configuration is used to validate the unknown fields in the proto message. Check will be done recursively for the nested messages.
The choice of the value depends on the use case and trade off between performance and strong consistency.

Use cases:
* `MESSAGE` - Only check non repeated-message fields.
* `MESSAGE_ARRAY_FIRST_INDEX` - Check any message type and first index of repeated message fields.
* `MESSAGE_ARRAY_FULL` - Check any message type and all elements of repeated message field.

Scenario:
* schema :
```
message Person {
string name = 1;
int32 age = 2;
repeated Pet pets = 3;
}
message Pet {
string name = 1;
int32 age = 2;
}
```
* payload :
```
{
"name": "person1",
"age": 10,
"pets": [
{
"name": "dog1",
"age": 2,
},
{
"name": "dog2",
"age": 3,
"color": "brown"
}
]
}
```
* `MESSAGE` - It will not validate the unknown field `color` in the repeated message field `pets`.
* `MESSAGE_ARRAY_FIRST_INDEX` - Validation returns true since the `color` (unknown field) field is not present in the first element of repeated message field `pets`.
* `MESSAGE_ARRAY_FULL` - Message will be validated for the unknown field `color` in all elements of repeated message field `pets`. Validation returns false since the unknown field is present in the second element of repeated message field `pets`.

* Example value: `MESSAGE`
* Type: `required`
* Default: `MESSAGE`


## `SINK_ADD_METADATA_ENABLED`

Defines whether to add Kafka metadata fields like topic, partition, offset, timestamp in the input proto messge.
Expand Down Expand Up @@ -90,4 +141,4 @@ Port of the StatsD host
Global tags for StatsD metrics. Tags must be comma-separated.

* Example value: `team=engineering,app=myapp`
* Type: `optional`
* Type: `optional`
253 changes: 253 additions & 0 deletions docs/reference/configuration/maxcompute.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
# MaxCompute Sink

A MaxCompute sink requires these configurations to be passed on alongside with generic ones

## SINK_MAXCOMPUTE_ODPS_URL

Contains the URL of the MaxCompute endpoint. Further documentation on MaxCompute [ODPS URL](https://www.alibabacloud.com/help/en/maxcompute/user-guide/endpoints).
* Example value: `http://service.ap-southeast-5.maxcompute.aliyun.com/api`
* Type: `required`

## SINK_MAXCOMPUTE_ACCESS_ID

Contains the access id of the MaxCompute project. Further documentation on MaxCompute [Access ID](https://www.alibabacloud.com/help/en/tablestore/support/obtain-an-accesskey-pair).
* Example value: `access-id`
* Type: `required`

## SINK_MAXCOMPUTE_ACCESS_KEY

Contains the access key of the MaxCompute project. Further documentation on MaxCompute [Access Key](https://www.alibabacloud.com/help/en/tablestore/support/obtain-an-accesskey-pair).
* Example value: `access-key`
* Type: `required`

## SINK_MAXCOMPUTE_PROJECT_ID

Contains the identifier of a MaxCompute project. Further documentation on MaxCompute [Project ID](https://www.alibabacloud.com/help/en/maxcompute/product-overview/project).
* Example value: `project-id`
* Type: `required`

## SINK_MAXCOMPUTE_ADD_METADATA_ENABLED

Configuration for enabling metadata in top of the record. This config will be used for adding metadata information to the record. Metadata information will be added to the record in the form of key-value pair.
* Example value: `false`
* Type: `required`
* Default value: `true`

## SINK_MAXCOMPUTE_METADATA_NAMESPACE

Configuration for wrapping the metadata fields under a specific namespace. This will result in the metadata fields contained in a struct.
Empty string will result in the metadata fields being added directly to the root level.
* Example value: `__kafka_metadata`
* Type: `optional`

## SINK_MAXCOMPUTE_METADATA_COLUMNS_TYPES

Configuration for defining the metadata columns and their types. This config will be used for defining the metadata columns and their types. The format of this config is `column1=type1,column2=type2`.
Supported types are `string`, `integer`, `long`, `timestamp`, `float`, `double`, `boolean`.

* Example value: `topic=string,partition=integer,offset=integer,timestamp=timestamp`
* Type: `optional`

## SINK_MAXCOMPUTE_SCHEMA

Contains the schema of the MaxCompute table. Schema is a dataset grouping of table columns. Further documentation on MaxCompute [Schema](https://www.alibabacloud.com/help/en/maxcompute/user-guide/schemas).
* Example value: `your_dataset_name`
* Type: `required`
* Default value: `default`

## SINK_MAXCOMPUTE_TABLE_PARTITIONING_ENABLE

Configuration for enabling partitioning in the MaxCompute table. This config will be used for enabling partitioning in the MaxCompute table.
* Example value: `true`
* Type: `required`
* Default value: `false`

## SINK_MAXCOMPUTE_TABLE_PARTITION_KEY

Contains the partition key of the MaxCompute table. Partition key is referring to the payload field that will be used as partition key in the MaxCompute table.
Supported MaxCompute type for partition key is `string`, `tinyint`, `smallint`, `int`, `bigint`, `timestamp_ntz`.
* Example value: `column1`
* Type: `optional`
* Default value: `default`

## SINK_MAXCOMPUTE_TABLE_PARTITION_BY_TIMESTAMP_TIME_UNIT

Contains the time unit for partitioning by timestamp. This config will be used for setting the time unit for partitioning by timestamp.
Supported time units are `YEAR`, `MONTH`, `DAY`, `HOUR`. Configuration is case-sensitive.

* Example value: `DAYS`
* Type: `required`
* Default value: `DAYS`

## SINK_MAXCOMPUTE_TABLE_PARTITION_COLUMN_NAME

Contains the partition column name of the MaxCompute table. This could be the same as the partition key or different. This will reflect the column name in the MaxCompute table.
Here the SINK_MAXCOMPUTE_TABLE_PARTITION_COLUMN_NAME is differentiated with SINK_MAXCOMPUTE_TABLE_PARTITION_KEY to allow the user to have a different column name in the MaxCompute table.
This is used for timestamp auto-partitioning feature where the partition column coexists with the original column.

* Example value: `column1`
* Type: `optional`

## SINK_MAXCOMPUTE_TABLE_NAME

Contains the name of the MaxCompute table. Further documentation on MaxCompute [Table Name](https://www.alibabacloud.com/help/en/maxcompute/user-guide/tables).
* Example value: `table_name`
* Type: `required`

## SINK_MAXCOMPUTE_TABLE_LIFECYCLE_DAYS

Contains the lifecycle of the MaxCompute table. This config will be used for setting the lifecycle of the MaxCompute table.
Not setting this config will result in table with lifecycle. Lifecycle is applied at partition level. Further documentation on MaxCompute [Table Lifecycle](https://www.alibabacloud.com/help/en/maxcompute/product-overview/lifecycle).
* Example value: `30`
* Type: `optional`

## SINK_MAXCOMPUTE_RECORD_PACK_FLUSH_TIMEOUT_MS

Contains the timeout for flushing the record pack in milliseconds. This config will be used for setting the timeout for flushing the record pack. Negative value indicates no timeout.
* Example value: `1000`
* Type: `required`
* Default value: `-1`

## SINK_MAXCOMPUTE_STREAMING_INSERT_COMPRESSION_ENABLED

Configuration for enabling compression in the streaming insert operation. This config will be used for enabling compression in the streaming insert operation.
* Example value: `false`
* Type: `required`
* Default value: `true`

## SINK_MAXCOMPUTE_STREAMING_INSERT_COMPRESSION_ALGORITHM

Configuration for defining the compression algorithm in the streaming insert operation. This config will be used for defining the compression algorithm in the streaming insert operation.
Supported values are ODPS_RAW, ODPS_ZLIB, ODPS_LZ4_FRAME, ODPS_ARROW_LZ4_FRAME, ODPS_ARROW_ZSTD
* Example value: `ODPS_ZLIB`
* Type: `required`
* Default value: `ODPS_LZ4_FRAME`

## SINK_MAXCOMPUTE_STREAMING_INSERT_COMPRESSION_LEVEL

Configuration for defining the compression level in the streaming insert operation. This config will be used for defining the compression level in the streaming insert operation.
Further documentation on MaxCompute [Compression](https://www.alibabacloud.com/help/en/maxcompute/user-guide/sdk-interfaces#section-cg2-7mb-849).
* Example value: `1`
* Type: `required`
* Default value: `1`

## SINK_MAXCOMPUTE_STREAMING_INSERT_COMPRESSION_STRATEGY

Configuration for defining the compression strategy in the streaming insert operation. This config will be used for defining the compression strategy in the streaming insert operation.
Further documentation on MaxCompute [Compression](https://www.alibabacloud.com/help/en/maxcompute/user-guide/sdk-interfaces#section-cg2-7mb-849).

* Example value: `1`
* Type: `required`
* Default value: `0`

## SINK_MAXCOMPUTE_STREAMING_INSERT_MAXIMUM_SESSION_COUNT

Contains the maximum session cached count for the streaming insert operation. This config will be used for setting the maximum session cache capacity for the streaming insert operation.
Least recently used session will be removed if the cache is full.

* Example value: `7`
* Type: `required`
* Default value: `2`

# SINK_MAXCOMPUTE_STREAMING_INSERT_TUNNEL_SLOT_COUNT_PER_SESSION

Contains the slot count per session for the streaming insert operation. This config will be used for setting the slot count per session for the streaming insert operation.
Check the official documentation https://www.alibabacloud.com/help/en/maxcompute/user-guide/overview-of-dts

* Example value: `2`
* Type: `required`
* Default value: `1`

## SINK_MAXCOMPUTE_ZONE_ID

Contains ZoneID used for parsing the timestamp in the record. This config will be used for parsing the timestamp in the record.

* Example value: `Asia/Bangkok`
* Type: `required`
* Default value: `Asia/Bangkok`

## SINK_MAXCOMPUTE_MAX_DDL_RETRY_COUNT

Contains the maximum retry count for DDL operations. This config will be used for setting the maximum retry count for DDL operations (create and update table schema).

* Example value: `3`
* Type: `required`
* Default value: `3`

## SINK_MAXCOMPUTE_DDL_RETRY_BACKOFF_MILLIS

Contains the backoff time in milliseconds for DDL operations. This config will be used for setting the backoff time in milliseconds for DDL operations (create and update table schema).

* Example value: `10000`
* Type: `required`
* Default value: `1000`

## SINK_MAXCOMPUTE_ODPS_GLOBAL_SETTINGS

Contains the global settings for the MaxCompute sink. This config will be used for setting the global settings for the MaxCompute sink. The format of this config is `key1=value1,key2=value2`.

* Example value: `odps.schema.evolution.enable=true,odps.namespace.schema=true,odps.sql.type.system.odps2=true`
* Type: `optional`
* Default value: `odps.schema.evolution.enable=true,odps.namespace.schema=true`

## SINK_MAXCOMPUTE_TABLE_VALIDATOR_NAME_REGEX

Contains the regex pattern for the table name validation. This config will be used for validating the table name. The table name should match the regex pattern.
Check the official documentation for the [table name](https://www.alibabacloud.com/help/en/maxcompute/product-overview/limits-4#:~:text=A%20table%20can%20contain%20a%20maximum%20of%2060%2C000%20partitions.&text=A%20table%20can%20contain%20a%20maximum%20of%20six%20levels%20of%20partitions.&text=A%20SELECT%20statement%20can%20return%20a%20maximum%20of%2010%2C000%20rows.&text=A%20MULTI%2DINSERT%20statement%20allows,tables%20at%20the%20same%20time.) for more information.

* Example value: `^[a-zA-Z_][a-zA-Z0-9_]*$`
* Type: `required`
* Default value: `^[A-Za-z][A-Za-z0-9_]{0,127}$`

## SINK_MAXCOMPUTE_TABLE_VALIDATOR_MAX_COLUMNS_PER_TABLE

Contains the maximum number of columns allowed in the table. This config will be used for setting the maximum number of columns allowed in the table.
Check the official documentation for the [table name](https://www.alibabacloud.com/help/en/maxcompute/product-overview/limits-4#:~:text=A%20table%20can%20contain%20a%20maximum%20of%2060%2C000%20partitions.&text=A%20table%20can%20contain%20a%20maximum%20of%20six%20levels%20of%20partitions.&text=A%20SELECT%20statement%20can%20return%20a%20maximum%20of%2010%2C000%20rows.&text=A%20MULTI%2DINSERT%20statement%20allows,tables%20at%20the%20same%20time.) for more information.

* Example value: `1000`
* Type: `required`
* Default value: `1200`

## SINK_MAXCOMPUTE_TABLE_VALIDATOR_MAX_PARTITION_KEYS_PER_TABLE

Contains the maximum number of partition keys allowed in the table. This config will be used for setting the maximum number of partition keys allowed in the table.
Check the official documentation for the [table name](https://www.alibabacloud.com/help/en/maxcompute/product-overview/limits-4#:~:text=A%20table%20can%20contain%20a%20maximum%20of%2060%2C000%20partitions.&text=A%20table%20can%20contain%20a%20maximum%20of%20six%20levels%20of%20partitions.&text=A%20SELECT%20statement%20can%20return%20a%20maximum%20of%2010%2C000%20rows.&text=A%20MULTI%2DINSERT%20statement%20allows,tables%20at%20the%20same%20time.) for more information.

* Example value: `6`
* Type: `required`
* Default value: `6`

## SINK_MAXCOMPUTE_VALID_MIN_TIMESTAMP

Contains the minimum valid timestamp. Records with timestamp field less than this value will be considered as invalid message.
Timestamp should be in the format of `yyyy-MM-ddTHH:mm:ss`.

* Example value: `0`
* Type: `required`
* Default value: `1970-01-01T00:00:00`

## SINK_MAXCOMPUTE_VALID_MAX_TIMESTAMP

Contains the maximum valid timestamp. Records with timestamp field more than this value will be considered as invalid message.
Timestamp should be in the format of `yyyy-MM-ddTHH:mm:ss`.

* Example value: `0`
* Type: `required`
* Default value: `9999-12-31T23:59:59`

## SINK_MAXCOMPUTE_MAX_PAST_EVENT_TIME_DIFFERENCE_YEAR

Contains the maximum past event time difference in years. Records with event time difference more than this value will be considered as invalid message.

* Example value: `1`
* Type: `required`
* Default value: `5`

## SINK_MAXCOMPUTE_MAX_FUTURE_EVENT_TIME_DIFFERENCE_YEAR

Contains the maximum future event time difference in years. Records with event time difference more than this value will be considered as invalid message.

* Example value: `1`
* Type: `required`
* Default value: `1`

Loading

0 comments on commit 707682c

Please sign in to comment.