diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java index 878b8410762..a58febedfb6 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java @@ -375,6 +375,9 @@ public class Constants { public static final ConfigOption SQL_SCRIPT_FILE = key("sql.script.file").noDefaultValue() .withDescription("The file which is sql script and contains multi statement"); + public static final ConfigOption ENABLE_LOG_REPORT = key("enable.log.report").defaultValue(false) + .withDescription("Whether to enable openTelemetry log report or not"); + // ------------------------------------------------------------------------ // File format and compression related // ------------------------------------------------------------------------ diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/Entrance.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/Entrance.java index a9744f793cf..1c312273294 100644 --- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/Entrance.java +++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/Entrance.java @@ -77,6 +77,8 @@ public static void main(String[] args) throws Exception { config.getString(Constants.UPSERT_MATERIALIZE)); tableEnv.getConfig().getConfiguration().setString(Constants.TABLE_EXEC_SINK_NOT_NULL_ENFORCER, config.getString(Constants.NOT_NULL_ENFORCER)); + tableEnv.getConfig().getConfiguration().setBoolean(Constants.ENABLE_LOG_REPORT.key(), + config.getBoolean(Constants.ENABLE_LOG_REPORT)); String sqlFile = config.getString(Constants.SQL_SCRIPT_FILE); Parser parser; if (StringUtils.isEmpty(sqlFile)) { diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Kafka2StarRocksTest.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Kafka2StarRocksTest.java index 70e6b2413ed..d6bcca2fa1a 100644 --- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Kafka2StarRocksTest.java +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Kafka2StarRocksTest.java @@ -20,6 +20,7 @@ import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnvJRE8; import org.apache.inlong.sort.tests.utils.JdbcProxy; import org.apache.inlong.sort.tests.utils.MySqlContainer; +import org.apache.inlong.sort.tests.utils.OpenTelemetryContainer; import org.apache.inlong.sort.tests.utils.PlaceholderResolver; import org.apache.inlong.sort.tests.utils.StarRocksContainer; import org.apache.inlong.sort.tests.utils.TestUtils; @@ -34,6 +35,7 @@ import org.testcontainers.containers.KafkaContainer; import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.utility.DockerImageName; +import org.testcontainers.utility.MountableFile; import java.io.IOException; import java.net.URI; @@ -111,6 +113,15 @@ public class Kafka2StarRocksTest extends FlinkContainerTestEnvJRE8 { .withNetworkAliases("mysql") .withLogConsumer(new Slf4jLogConsumer(MYSQL_LOG)); + @ClassRule + public static final OpenTelemetryContainer OPEN_TELEMETRY_CONTAINER = + (OpenTelemetryContainer) new OpenTelemetryContainer() + .withCopyFileToContainer(MountableFile.forClasspathResource("/env/otel-config.yaml"), + "/otel-config.yaml") + .withCommand("--config=/otel-config.yaml") + .withNetwork(NETWORK) + .withNetworkAliases("logcollector"); + @Before public void setup() { waitUntilJobRunning(Duration.ofSeconds(30)); @@ -152,6 +163,10 @@ public static void teardown() { if (STAR_ROCKS != null) { STAR_ROCKS.stop(); } + + if (OPEN_TELEMETRY_CONTAINER != null) { + OPEN_TELEMETRY_CONTAINER.stop(); + } } private void initializeKafkaTable(String topic) { @@ -223,5 +238,10 @@ public void testKafkaWithSqlFile() throws Exception { "test_output1", 3, 60000L); + // check log appender + String logs = OPEN_TELEMETRY_CONTAINER.getLogs(); + if (!logs.contains("OpenTelemetryLogger installed")) { + throw new Exception("Failure to append logs to OpenTelemetry"); + } } } \ No newline at end of file diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Mysql2StarRocksTest.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Mysql2StarRocksTest.java index 6b7a5aa644e..e2867d7f9d2 100644 --- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Mysql2StarRocksTest.java +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Mysql2StarRocksTest.java @@ -20,6 +20,7 @@ import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnvJRE8; import org.apache.inlong.sort.tests.utils.JdbcProxy; import org.apache.inlong.sort.tests.utils.MySqlContainer; +import org.apache.inlong.sort.tests.utils.OpenTelemetryContainer; import org.apache.inlong.sort.tests.utils.StarRocksContainer; import org.apache.inlong.sort.tests.utils.TestUtils; @@ -30,6 +31,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.utility.MountableFile; import java.net.URISyntaxException; import java.nio.file.Path; @@ -85,6 +87,15 @@ public class Mysql2StarRocksTest extends FlinkContainerTestEnvJRE8 { .withNetworkAliases("mysql") .withLogConsumer(new Slf4jLogConsumer(LOG)); + @ClassRule + public static final OpenTelemetryContainer OPEN_TELEMETRY_CONTAINER = + (OpenTelemetryContainer) new OpenTelemetryContainer() + .withCopyFileToContainer(MountableFile.forClasspathResource("/env/otel-config.yaml"), + "/otel-config.yaml") + .withCommand("--config=/otel-config.yaml") + .withNetwork(NETWORK) + .withNetworkAliases("logcollector"); + @Before public void setup() { waitUntilJobRunning(Duration.ofSeconds(30)); @@ -121,6 +132,9 @@ public static void teardown() { if (STAR_ROCKS != null) { STAR_ROCKS.stop(); } + if (OPEN_TELEMETRY_CONTAINER != null) { + OPEN_TELEMETRY_CONTAINER.stop(); + } } /** @@ -161,6 +175,11 @@ public void testMysqlUpdateAndDelete() throws Exception { expectResult, "test_output1", 3, - 60000L); + 80000L); + // check log appender + String logs = OPEN_TELEMETRY_CONTAINER.getLogs(); + if (!logs.contains("OpenTelemetryLogger installed")) { + throw new Exception("Failure to append logs to OpenTelemetry"); + } } } \ No newline at end of file diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Pulsar2StarRocksTest.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Pulsar2StarRocksTest.java index e5252d0b4a2..9c40577778c 100644 --- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Pulsar2StarRocksTest.java +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Pulsar2StarRocksTest.java @@ -19,6 +19,7 @@ import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnvJRE8; import org.apache.inlong.sort.tests.utils.JdbcProxy; +import org.apache.inlong.sort.tests.utils.OpenTelemetryContainer; import org.apache.inlong.sort.tests.utils.StarRocksContainer; import org.apache.inlong.sort.tests.utils.TestUtils; @@ -35,6 +36,7 @@ import org.testcontainers.containers.PulsarContainer; import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.utility.DockerImageName; +import org.testcontainers.utility.MountableFile; import java.net.URI; import java.net.URISyntaxException; @@ -91,6 +93,15 @@ public class Pulsar2StarRocksTest extends FlinkContainerTestEnvJRE8 { .withNetworkAliases(INTER_CONTAINER_STAR_ROCKS_ALIAS) .withLogConsumer(new Slf4jLogConsumer(STAR_ROCKS_LOG)); + @ClassRule + public static final OpenTelemetryContainer OPEN_TELEMETRY_CONTAINER = + (OpenTelemetryContainer) new OpenTelemetryContainer() + .withCopyFileToContainer(MountableFile.forClasspathResource("/env/otel-config.yaml"), + "/otel-config.yaml") + .withCommand("--config=/otel-config.yaml") + .withNetwork(NETWORK) + .withNetworkAliases("logcollector"); + @Before public void setup() { waitUntilJobRunning(Duration.ofSeconds(30)); @@ -119,6 +130,9 @@ public static void teardown() { if (STAR_ROCKS != null) { STAR_ROCKS.stop(); } + if (OPEN_TELEMETRY_CONTAINER != null) { + OPEN_TELEMETRY_CONTAINER.stop(); + } } @Test @@ -144,6 +158,11 @@ public void testPulsarToStarRocks() throws Exception { "1,Alice,Hello, Pulsar", "2,Bob,Goodbye, Pulsar"); proxy.checkResultWithTimeout(expectedResult, "test_output1", 3, 60000L); + // check log appender + String logs = OPEN_TELEMETRY_CONTAINER.getLogs(); + if (!logs.contains("OpenTelemetryLogger installed")) { + throw new Exception("Failure to append logs to OpenTelemetry"); + } } } diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java index de6166442ea..47ccd67f331 100644 --- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java @@ -130,6 +130,7 @@ public void submitSQLJob(String sqlFile, Path... jars) commands.add(copyToContainerTmpPath(jobManager, constructDistJar(jars))); commands.add("--sql.script.file"); commands.add(containerSqlFile); + commands.add("--enable.log.report true"); ExecResult execResult = jobManager.execInContainer("bash", "-c", String.join(" ", commands)); diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnvJRE8.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnvJRE8.java index a59d9c9e982..83c14a113b3 100644 --- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnvJRE8.java +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnvJRE8.java @@ -45,6 +45,7 @@ public static void before() { .withNetworkAliases(INTER_CONTAINER_TM_ALIAS) .withExposedPorts(DEBUG_PORT) .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES) + .withEnv("OTEL_EXPORTER_ENDPOINT", "logcollector:4317") .dependsOn(jobManager) .withLogConsumer(new Slf4jLogConsumer(TM_LOG)); diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/OpenTelemetryContainer.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/OpenTelemetryContainer.java new file mode 100644 index 00000000000..0d07e85a9e8 --- /dev/null +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/OpenTelemetryContainer.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.tests.utils; + +import org.testcontainers.containers.GenericContainer; + +public class OpenTelemetryContainer extends GenericContainer { + + public static final String IMAGE = "otel/opentelemetry-collector-contrib:0.110.0"; + public static final Integer PORT = 4317; + public OpenTelemetryContainer() { + super(IMAGE); + addExposedPort(PORT); + } +} diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/env/otel-config.yaml b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/env/otel-config.yaml new file mode 100644 index 00000000000..ad48b9cb07e --- /dev/null +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/env/otel-config.yaml @@ -0,0 +1,35 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +receivers: + otlp: + protocols: + grpc: + endpoint: logcollector:4317 +processors: + batch: + +exporters: + debug: + verbosity: detailed + +service: + pipelines: + logs: + receivers: [otlp] + processors: [batch] + exporters: [debug] \ No newline at end of file diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/mysql_test.sql b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/mysql_test.sql index 9f74d54ae75..6c4a3efb84c 100644 --- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/mysql_test.sql +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/mysql_test.sql @@ -10,7 +10,7 @@ CREATE TABLE test_input1 ( 'password' = 'inlong', 'database-name' = 'test', 'table-name' = 'test_input1', - 'scan.incremental.snapshot.enabled' = 'false', + 'scan.incremental.snapshot.enabled' = 'true', 'jdbc.properties.useSSL' = 'false', 'jdbc.properties.allowPublicKeyRetrieval' = 'true' ); diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java index 7e02b0ca96d..d2480be58fd 100644 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java @@ -469,4 +469,10 @@ public final class Constants { .withDescription( "pulsar client auth params"); + public static final ConfigOption ENABLE_LOG_REPORT = + ConfigOptions.key("enable.log.report") + .booleanType() + .defaultValue(false) + .withDescription("Whether enable openTelemetry log report or not."); + } diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/pom.xml b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/pom.xml index e86ebe1c004..8c5639e1a7e 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/pom.xml +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/pom.xml @@ -151,6 +151,8 @@ false + io.opentelemetry* + com.squareup.* org.apache.inlong:* com.google.protobuf:* org.apache.kafka:* diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/IcebergSource.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/IcebergSource.java index adff22e7cd6..7b1b3343171 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/IcebergSource.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/IcebergSource.java @@ -17,6 +17,7 @@ package org.apache.inlong.sort.iceberg.source; +import org.apache.inlong.sort.base.Constants; import org.apache.inlong.sort.base.metric.MetricOption; import org.apache.inlong.sort.iceberg.IcebergReadableMetadata.MetadataConverter; import org.apache.inlong.sort.iceberg.source.reader.IcebergSourceReader; @@ -94,6 +95,7 @@ public class IcebergSource implements Source implements Source readerFunction, SplitAssignerFactory assignerFactory, Table table, - MetricOption metricOption) { + MetricOption metricOption, + boolean enableLogReport) { this.tableLoader = tableLoader; this.scanContext = scanContext; this.readerFunction = readerFunction; this.assignerFactory = assignerFactory; this.table = table; this.metricOption = metricOption; + this.enableLogReport = enableLogReport; } String name() { @@ -167,7 +171,7 @@ public SourceReader createReader(SourceReaderContext read InlongIcebergSourceReaderMetrics metrics = new InlongIcebergSourceReaderMetrics<>(readerContext.metricGroup(), lazyTable().name()); metrics.registerMetrics(metricOption); - return new IcebergSourceReader<>(metrics, readerFunction, readerContext); + return new IcebergSourceReader<>(metrics, readerFunction, readerContext, enableLogReport); } @Override @@ -522,9 +526,10 @@ public IcebergSource build() { } resolveMetricOption(); checkRequired(); + boolean enableLogReport = flinkConfig.get(Constants.ENABLE_LOG_REPORT); // Since builder already load the table, pass it to the source to avoid double loading return new IcebergSource( - tableLoader, context, readerFunction, splitAssignerFactory, table, metricOption); + tableLoader, context, readerFunction, splitAssignerFactory, table, metricOption, enableLogReport); } private void checkRequired() { diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/IcebergTableSource.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/IcebergTableSource.java index f55f63fadd3..7f5298a7bf5 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/IcebergTableSource.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/IcebergTableSource.java @@ -17,6 +17,7 @@ package org.apache.inlong.sort.iceberg.source; +import org.apache.inlong.sort.base.Constants; import org.apache.inlong.sort.iceberg.IcebergReadableMetadata; import org.apache.inlong.sort.iceberg.IcebergReadableMetadata.MetadataConverter; @@ -83,6 +84,7 @@ public class IcebergTableSource private final Map properties; private final boolean isLimitPushDown; private final ReadableConfig readableConfig; + private final boolean enableLogReport; private IcebergTableSource(IcebergTableSource toCopy) { this.loader = toCopy.loader; @@ -95,6 +97,7 @@ private IcebergTableSource(IcebergTableSource toCopy) { this.readableConfig = toCopy.readableConfig; this.producedDataType = toCopy.producedDataType; this.metadataKeys = toCopy.metadataKeys; + this.enableLogReport = toCopy.enableLogReport; } public IcebergTableSource( @@ -124,6 +127,7 @@ private IcebergTableSource( this.readableConfig = readableConfig; this.producedDataType = schema.toPhysicalRowDataType(); this.metadataKeys = new ArrayList<>(); + this.enableLogReport = readableConfig.get(Constants.ENABLE_LOG_REPORT); } @Override diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceReader.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceReader.java index a8ad9b52594..e97adc01d4c 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceReader.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceReader.java @@ -17,6 +17,8 @@ package org.apache.inlong.sort.iceberg.source.reader; +import org.apache.inlong.sort.base.util.OpenTelemetryLogger; + import org.apache.flink.annotation.Internal; import org.apache.flink.api.connector.source.SourceReaderContext; import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase; @@ -25,6 +27,7 @@ import org.apache.iceberg.flink.source.split.IcebergSourceSplit; import org.apache.iceberg.flink.source.split.SplitRequestEvent; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.logging.log4j.Level; import java.util.Collection; import java.util.Collections; @@ -40,20 +43,34 @@ public class IcebergSourceReader SingleThreadMultiplexSourceReaderBase, T, IcebergSourceSplit, IcebergSourceSplit> { private final InlongIcebergSourceReaderMetrics metrics; + private OpenTelemetryLogger openTelemetryLogger; + private final boolean enableLogReport; + public IcebergSourceReader( InlongIcebergSourceReaderMetrics metrics, ReaderFunction readerFunction, - SourceReaderContext context) { + SourceReaderContext context, + boolean enableLogReport) { super( () -> new IcebergSourceSplitReader<>(metrics, readerFunction, context), new IcebergSourceRecordEmitter<>(), context.getConfiguration(), context); this.metrics = metrics; + this.enableLogReport = enableLogReport; + if (this.enableLogReport) { + this.openTelemetryLogger = new OpenTelemetryLogger.Builder() + .setLogLevel(Level.ERROR) + .setServiceName(this.getClass().getSimpleName()) + .setLocalHostIp(this.context.getLocalHostName()).build(); + } } @Override public void start() { + if (this.enableLogReport) { + this.openTelemetryLogger.install(); + } // We request a split only if we did not get splits during the checkpoint restore. // Otherwise, reader restarts will keep requesting more and more splits. if (getNumberOfCurrentlyAssignedSplits() == 0) { @@ -61,6 +78,14 @@ public void start() { } } + @Override + public void close() throws Exception { + super.close(); + if (this.enableLogReport) { + openTelemetryLogger.uninstall(); + } + } + @Override protected void onSplitFinished(Map finishedSplitIds) { requestSplit(Lists.newArrayList(finishedSplitIds.keySet())); diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/pom.xml b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/pom.xml index 7a65cd6a716..9b629a11480 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/pom.xml +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/pom.xml @@ -76,6 +76,8 @@ false + io.opentelemetry* + com.squareup.* org.apache.inlong:* org.apache.kafka:* org.apache.flink:flink-connector-kafka diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/KafkaSource.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/KafkaSource.java index 5004ec34a32..7fe014c54c0 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/KafkaSource.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/KafkaSource.java @@ -101,6 +101,7 @@ public class KafkaSource private final KafkaDeserializationSchema metricSchema; // The configurations. private final Properties props; + private final boolean enableLogReport; KafkaSource( KafkaSubscriber subscriber, @@ -109,7 +110,8 @@ public class KafkaSource Boundedness boundedness, KafkaRecordDeserializationSchema deserializationSchema, KafkaDeserializationSchema metricSchema, - Properties props) { + Properties props, + boolean enableLogReport) { this.subscriber = subscriber; this.startingOffsetsInitializer = startingOffsetsInitializer; this.stoppingOffsetsInitializer = stoppingOffsetsInitializer; @@ -117,6 +119,7 @@ public class KafkaSource this.deserializationSchema = deserializationSchema; this.metricSchema = metricSchema; this.props = props; + this.enableLogReport = enableLogReport; } /** @@ -175,7 +178,8 @@ public UserCodeClassLoader getUserCodeClassLoader() { toConfiguration(props), readerContext, kafkaSourceReaderMetrics, - metricSchema); + metricSchema, + enableLogReport); } @Internal diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/KafkaSourceBuilder.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/KafkaSourceBuilder.java index 58bb651b249..7e0422e4814 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/KafkaSourceBuilder.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/KafkaSourceBuilder.java @@ -105,6 +105,7 @@ public class KafkaSourceBuilder { private KafkaDeserializationSchema metricSchema; // The configurations. protected Properties props; + private boolean enableLogReport; KafkaSourceBuilder() { this.subscriber = null; @@ -407,6 +408,11 @@ public KafkaSourceBuilder setMetricSchema(KafkaDeserializationSchema enableLogReport(boolean enableLogReport) { + this.enableLogReport = enableLogReport; + return this; + } + /** * Build the {@link KafkaSource}. * @@ -422,7 +428,8 @@ public KafkaSource build() { boundedness, deserializationSchema, metricSchema, - props); + props, + enableLogReport); } // ------------- private helpers -------------- diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/reader/KafkaSourceReader.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/reader/KafkaSourceReader.java index 4643887c497..9474269a8a5 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/reader/KafkaSourceReader.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/reader/KafkaSourceReader.java @@ -17,6 +17,7 @@ package org.apache.inlong.sort.kafka.source.reader; +import org.apache.inlong.sort.base.util.OpenTelemetryLogger; import org.apache.inlong.sort.kafka.table.DynamicKafkaDeserializationSchema; import org.apache.flink.annotation.Internal; @@ -37,6 +38,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; +import org.apache.logging.log4j.Level; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,6 +68,8 @@ public class KafkaSourceReader private final KafkaSourceReaderMetrics kafkaSourceReaderMetrics; private final boolean commitOffsetsOnCheckpoint; private final KafkaDeserializationSchema metricSchema; + private OpenTelemetryLogger openTelemetryLogger; + private final boolean enableLogReport; public KafkaSourceReader( FutureCompletingBlockingQueue>> elementsQueue, @@ -74,7 +78,8 @@ public KafkaSourceReader( Configuration config, SourceReaderContext context, KafkaSourceReaderMetrics kafkaSourceReaderMetrics, - KafkaDeserializationSchema metricSchema) { + KafkaDeserializationSchema metricSchema, + boolean enableLogReport) { super(elementsQueue, kafkaSourceFetcherManager, recordEmitter, config, context); this.offsetsToCommit = Collections.synchronizedSortedMap(new TreeMap<>()); this.offsetsOfFinishedSplits = new ConcurrentHashMap<>(); @@ -82,11 +87,34 @@ public KafkaSourceReader( this.commitOffsetsOnCheckpoint = config.get(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT); this.metricSchema = metricSchema; + this.enableLogReport = enableLogReport; if (!commitOffsetsOnCheckpoint) { LOG.warn( "Offset commit on checkpoint is disabled. " + "Consuming offset will not be reported back to Kafka cluster."); } + if (this.enableLogReport) { + this.openTelemetryLogger = new OpenTelemetryLogger.Builder() + .setLogLevel(Level.ERROR) + .setServiceName(this.getClass().getSimpleName()) + .setLocalHostIp(this.context.getLocalHostName()).build(); + } + } + + @Override + public void start() { + if (this.enableLogReport) { + this.openTelemetryLogger.install(); + } + super.start(); + } + + @Override + public void close() throws Exception { + super.close(); + if (this.enableLogReport) { + openTelemetryLogger.uninstall(); + } } @Override diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java index 9b0b0aff646..ef7e34778ce 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java @@ -163,6 +163,8 @@ public class KafkaDynamicSource private final MetricOption metricOption; + private final boolean enableLogReport; + public KafkaDynamicSource( DataType physicalDataType, @Nullable DecodingFormat> keyDecodingFormat, @@ -178,7 +180,8 @@ public KafkaDynamicSource( long startupTimestampMillis, boolean upsertMode, String tableIdentifier, - MetricOption metricOption) { + MetricOption metricOption, + boolean enableLogReport) { // Format attributes this.physicalDataType = Preconditions.checkNotNull( @@ -213,6 +216,7 @@ public KafkaDynamicSource( this.upsertMode = upsertMode; this.tableIdentifier = tableIdentifier; this.metricOption = metricOption; + this.enableLogReport = enableLogReport; } @Override @@ -328,7 +332,8 @@ public DynamicTableSource copy() { startupTimestampMillis, upsertMode, tableIdentifier, - metricOption); + metricOption, + enableLogReport); copy.producedDataType = producedDataType; copy.metadataKeys = metadataKeys; copy.watermarkStrategy = watermarkStrategy; @@ -366,7 +371,8 @@ public boolean equals(Object o) { && Objects.equals(upsertMode, that.upsertMode) && Objects.equals(tableIdentifier, that.tableIdentifier) && Objects.equals(watermarkStrategy, that.watermarkStrategy) - && Objects.equals(metricOption, that.metricOption); + && Objects.equals(metricOption, that.metricOption) + && Objects.equals(enableLogReport, that.enableLogReport); } @Override @@ -389,7 +395,8 @@ public int hashCode() { upsertMode, tableIdentifier, watermarkStrategy, - metricOption); + metricOption, + enableLogReport); } // -------------------------------------------------------------------------------------------- @@ -443,7 +450,8 @@ protected KafkaSource createKafkaSource( kafkaSourceBuilder .setProperties(properties) .setDeserializer(KafkaRecordDeserializationSchema.of(kafkaDeserializer)) - .setMetricSchema(kafkaDeserializer); + .setMetricSchema(kafkaDeserializer) + .enableLogReport(enableLogReport); return kafkaSourceBuilder.build(); } diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java index 3a320b7f881..1070039174a 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java @@ -200,6 +200,7 @@ public DynamicTableSource createDynamicTableSource(Context context) { String inlongMetric = tableOptions.getOptional(INLONG_METRIC).orElse(null); String auditHostAndPorts = tableOptions.get(INLONG_AUDIT); String auditKeys = tableOptions.get(AUDIT_KEYS); + Boolean enableLogReport = context.getConfiguration().get(ENABLE_LOG_REPORT); MetricOption metricOption = MetricOption.builder() .withInlongLabels(inlongMetric) @@ -220,7 +221,8 @@ public DynamicTableSource createDynamicTableSource(Context context) { startupOptions.specificOffsets, startupOptions.startupTimestampMillis, context.getObjectIdentifier().asSummaryString(), - metricOption); + metricOption, + enableLogReport); } @Override @@ -387,7 +389,8 @@ protected KafkaDynamicSource createKafkaTableSource( Map specificStartupOffsets, long startupTimestampMillis, String tableIdentifier, - MetricOption metricOption) { + MetricOption metricOption, + boolean enableLogReport) { return new KafkaDynamicSource( physicalDataType, keyDecodingFormat, @@ -403,7 +406,8 @@ protected KafkaDynamicSource createKafkaTableSource( startupTimestampMillis, false, tableIdentifier, - metricOption); + metricOption, + enableLogReport); } protected KafkaDynamicSink createKafkaTableSink( diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java index 6a8f5e20c92..2173ee57187 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java @@ -133,7 +133,7 @@ public DynamicTableSource createDynamicTableSource(Context context) { String inlongMetric = tableOptions.getOptional(INLONG_METRIC).orElse(null); String auditHostAndPorts = tableOptions.get(INLONG_AUDIT); String auditKeys = tableOptions.get(AUDIT_KEYS); - + Boolean enableLogReport = context.getConfiguration().get(ENABLE_LOG_REPORT); MetricOption metricOption = MetricOption.builder() .withInlongLabels(inlongMetric) .withAuditAddress(auditHostAndPorts) @@ -155,7 +155,8 @@ public DynamicTableSource createDynamicTableSource(Context context) { 0, true, context.getObjectIdentifier().asSummaryString(), - metricOption); + metricOption, + enableLogReport); } @Override diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/pom.xml b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/pom.xml index 46f3d5b12f9..43665043e54 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/pom.xml +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/pom.xml @@ -95,6 +95,8 @@ false + io.opentelemetry* + com.squareup.* org.apache.inlong:* io.debezium:debezium-api io.debezium:debezium-embedded diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MySqlTableSource.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MySqlTableSource.java index 55aec6ed1c9..3273f3ee2d0 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MySqlTableSource.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MySqlTableSource.java @@ -86,6 +86,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat private final Properties jdbcProperties; private final Duration heartbeatInterval; private final String chunkKeyColumn; + private final boolean enableLogReport; // -------------------------------------------------------------------------------------------- // Mutable attributes @@ -121,6 +122,7 @@ public MySqlTableSource( double distributionFactorLower, StartupOptions startupOptions, boolean scanNewlyAddedTableEnabled, + boolean enableLogReport, Properties jdbcProperties, Duration heartbeatInterval, @Nullable String chunkKeyColumn, @@ -136,6 +138,7 @@ public MySqlTableSource( this.serverTimeZone = serverTimeZone; this.dbzProperties = dbzProperties; this.enableParallelRead = enableParallelRead; + this.enableLogReport = enableLogReport; this.splitSize = splitSize; this.splitMetaGroupSize = splitMetaGroupSize; this.fetchSize = fetchSize; @@ -206,6 +209,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { .startupOptions(startupOptions) .deserializer(deserializer) .scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled) + .enableLogReport(enableLogReport) .jdbcProperties(jdbcProperties) .heartbeatInterval(heartbeatInterval) .chunkKeyColumn(chunkKeyColumn) @@ -285,6 +289,7 @@ public DynamicTableSource copy() { distributionFactorLower, startupOptions, scanNewlyAddedTableEnabled, + enableLogReport, jdbcProperties, heartbeatInterval, chunkKeyColumn, @@ -311,6 +316,7 @@ public boolean equals(Object o) { && distributionFactorUpper == that.distributionFactorUpper && distributionFactorLower == that.distributionFactorLower && scanNewlyAddedTableEnabled == that.scanNewlyAddedTableEnabled + && enableLogReport == that.enableLogReport && Objects.equals(physicalSchema, that.physicalSchema) && Objects.equals(hostname, that.hostname) && Objects.equals(database, that.database) @@ -358,6 +364,7 @@ public int hashCode() { producedDataType, metadataKeys, scanNewlyAddedTableEnabled, + enableLogReport, jdbcProperties, heartbeatInterval, chunkKeyColumn, diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MysqlTableFactory.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MysqlTableFactory.java index f903780a367..0dc854dfa2e 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MysqlTableFactory.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MysqlTableFactory.java @@ -17,6 +17,7 @@ package org.apache.inlong.sort.mysql; +import org.apache.inlong.sort.base.Constants; import org.apache.inlong.sort.base.metric.MetricOption; import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions; @@ -69,6 +70,7 @@ public DynamicTableSource createDynamicTableSource(Context context) { final String username = config.get(USERNAME); final String password = config.get(PASSWORD); final String databaseName = config.get(DATABASE_NAME); + final Boolean enableLogReport = context.getConfiguration().get(Constants.ENABLE_LOG_REPORT); validateRegex(DATABASE_NAME.key(), databaseName); final String tableName = config.get(TABLE_NAME); validateRegex(TABLE_NAME.key(), tableName); @@ -129,6 +131,7 @@ public DynamicTableSource createDynamicTableSource(Context context) { distributionFactorLower, startupOptions, scanNewlyAddedTableEnabled, + enableLogReport, getJdbcProperties(context.getCatalogTable().getOptions()), heartbeatInterval, chunkKeyColumn, @@ -337,7 +340,6 @@ public Set> optionalOptions() { + "\"-U\" represents UPDATE_BEFORE.\n" + "\"+U\" represents UPDATE_AFTER.\n" + "\"-D\" represents DELETE."); - // ---------------------------------------------------------------------------- // experimental options, won't add them to documentation // ---------------------------------------------------------------------------- diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/MySqlSource.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/MySqlSource.java index da1dcbadae7..3d4af2caaec 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/MySqlSource.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/MySqlSource.java @@ -103,6 +103,7 @@ public class MySqlSource private final MySqlSourceConfigFactory configFactory; private final DebeziumDeserializationSchema deserializationSchema; + private final boolean enableLogReport; /** * Get a MySqlParallelSourceBuilder to build a {@link MySqlSource}. @@ -116,9 +117,11 @@ public static MySqlSourceBuilder builder() { MySqlSource( MySqlSourceConfigFactory configFactory, - DebeziumDeserializationSchema deserializationSchema) { + DebeziumDeserializationSchema deserializationSchema, + boolean enableLogReport) { this.configFactory = configFactory; this.deserializationSchema = deserializationSchema; + this.enableLogReport = enableLogReport; } public MySqlSourceConfigFactory getConfigFactory() { @@ -168,7 +171,8 @@ public SourceReader createReader(SourceReaderContext readerContex readerContext.getConfiguration(), mySqlSourceReaderContext, sourceConfig, - deserializationSchema); + deserializationSchema, + enableLogReport); } @Override diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/MySqlSourceBuilder.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/MySqlSourceBuilder.java index 775da14035e..3b4b7ff811d 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/MySqlSourceBuilder.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/MySqlSourceBuilder.java @@ -51,6 +51,7 @@ public class MySqlSourceBuilder { private final MySqlSourceConfigFactory configFactory = new MySqlSourceConfigFactory(); private DebeziumDeserializationSchema deserializer; + private boolean enableLogReport; public MySqlSourceBuilder hostname(String hostname) { this.configFactory.hostname(hostname); @@ -235,12 +236,17 @@ public MySqlSourceBuilder heartbeatInterval(Duration heartbeatInterval) { return this; } + public MySqlSourceBuilder enableLogReport(boolean enableLogReport) { + this.enableLogReport = enableLogReport; + return this; + } + /** * Build the MySqlSource * * @return a MySqlParallelSource with the settings made for this builder. */ public MySqlSource build() { - return new MySqlSource<>(configFactory, checkNotNull(deserializer)); + return new MySqlSource<>(configFactory, checkNotNull(deserializer), enableLogReport); } } diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/reader/MySqlSourceReader.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/reader/MySqlSourceReader.java index 01f34f28b15..6fbd7b96ba4 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/reader/MySqlSourceReader.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/reader/MySqlSourceReader.java @@ -17,6 +17,7 @@ package org.apache.inlong.sort.mysql.source.reader; +import org.apache.inlong.sort.base.util.OpenTelemetryLogger; import org.apache.inlong.sort.mysql.RowDataDebeziumDeserializeSchema; import com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils; @@ -55,6 +56,7 @@ import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager; import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; import org.apache.flink.util.FlinkRuntimeException; +import org.apache.logging.log4j.Level; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -88,6 +90,8 @@ public class MySqlSourceReader private final MySqlSourceReaderContext mySqlSourceReaderContext; private MySqlBinlogSplit suspendedBinlogSplit; private final DebeziumDeserializationSchema metricSchema; + private OpenTelemetryLogger openTelemetryLogger; + private final boolean enableLogReport; public MySqlSourceReader( FutureCompletingBlockingQueue> elementQueue, @@ -95,7 +99,8 @@ public MySqlSourceReader( RecordEmitter recordEmitter, Configuration config, MySqlSourceReaderContext context, - MySqlSourceConfig sourceConfig, DebeziumDeserializationSchema metricSchema) { + MySqlSourceConfig sourceConfig, DebeziumDeserializationSchema metricSchema, + boolean enableLogReport) { super( elementQueue, new SingleThreadFetcherManager<>(elementQueue, splitReaderSupplier::get), @@ -109,15 +114,33 @@ public MySqlSourceReader( this.mySqlSourceReaderContext = context; this.suspendedBinlogSplit = null; this.metricSchema = metricSchema; + this.enableLogReport = enableLogReport; + if (enableLogReport) { + this.openTelemetryLogger = new OpenTelemetryLogger.Builder() + .setLogLevel(Level.ERROR) + .setServiceName(this.getClass().getSimpleName()) + .setLocalHostIp(this.context.getLocalHostName()).build(); + } } @Override public void start() { + if (enableLogReport) { + openTelemetryLogger.install(); + } if (getNumberOfCurrentlyAssignedSplits() == 0) { context.sendSplitRequest(); } } + @Override + public void close() throws Exception { + super.close(); + if (enableLogReport) { + openTelemetryLogger.uninstall(); + } + } + @Override protected MySqlSplitState initializedState(MySqlSplit split) { if (split.isSnapshotSplit()) { diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/pom.xml b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/pom.xml index 6a90273d632..8d7b5086d61 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/pom.xml +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/pom.xml @@ -242,6 +242,8 @@ true + io.opentelemetry* + com.squareup.* org.apache.inlong:* io.streamnative.connectors:pulsar-flink-connector-origin* io.streamnative.connectors:flink-protobuf diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java index 780dfde8b53..7fa2a4cddc0 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java @@ -48,9 +48,7 @@ import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL; import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME; import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM; -import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS; -import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT; -import static org.apache.inlong.sort.base.Constants.INLONG_METRIC; +import static org.apache.inlong.sort.base.Constants.*; import static org.apache.inlong.sort.pulsar.PulsarTableOptionUtils.createKeyFormatProjection; import static org.apache.inlong.sort.pulsar.PulsarTableOptionUtils.createValueFormatProjection; import static org.apache.inlong.sort.pulsar.PulsarTableOptionUtils.getKeyDecodingFormat; @@ -125,7 +123,7 @@ public DynamicTableSource createDynamicTableSource(Context context) { final StartCursor startCursor = getStartCursor(tableOptions); final StopCursor stopCursor = getStopCursor(tableOptions); final SubscriptionType subscriptionType = getSubscriptionType(tableOptions); - + final boolean enableLogReport = context.getConfiguration().get(ENABLE_LOG_REPORT); // Forward source configs final Properties properties = getPulsarProperties(tableOptions); properties.setProperty(PULSAR_ADMIN_URL.key(), tableOptions.get(ADMIN_URL)); @@ -163,7 +161,6 @@ public DynamicTableSource createDynamicTableSource(Context context) { final DecodingFormat> decodingFormatForMetadataPushdown = valueDecodingFormat; final ChangelogMode changelogMode = decodingFormatForMetadataPushdown.getChangelogMode(); - return new PulsarTableSource( deserializationSchemaFactory, decodingFormatForMetadataPushdown, @@ -172,7 +169,8 @@ public DynamicTableSource createDynamicTableSource(Context context) { properties, startCursor, stopCursor, - subscriptionType); + subscriptionType, + enableLogReport); } @Override diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/PulsarSource.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/PulsarSource.java index b7b608ec826..756d2cb1a84 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/PulsarSource.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/PulsarSource.java @@ -91,6 +91,8 @@ public final class PulsarSource /** The pulsar deserialization schema used for deserializing message. */ private final PulsarDeserializationSchema deserializationSchema; + private final boolean enableLogReport; + /** * The constructor for PulsarSource, it's package protected for forcing using {@link * PulsarSourceBuilder}. @@ -102,7 +104,8 @@ public final class PulsarSource StartCursor startCursor, StopCursor stopCursor, Boundedness boundedness, - PulsarDeserializationSchema deserializationSchema) { + PulsarDeserializationSchema deserializationSchema, + boolean enableLogReport) { this.sourceConfiguration = sourceConfiguration; this.subscriber = subscriber; this.rangeGenerator = rangeGenerator; @@ -110,6 +113,7 @@ public final class PulsarSource this.stopCursor = stopCursor; this.boundedness = boundedness; this.deserializationSchema = deserializationSchema; + this.enableLogReport = enableLogReport; } /** @@ -136,7 +140,7 @@ public SourceReader createReader(SourceReaderContext deserializationSchema.open(initializationContext, sourceConfiguration); return PulsarSourceReaderFactory.create( - readerContext, deserializationSchema, sourceConfiguration); + readerContext, deserializationSchema, sourceConfiguration, enableLogReport); } @Internal diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/PulsarSourceBuilder.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/PulsarSourceBuilder.java index d8c05ce152f..f783aa3bcf7 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/PulsarSourceBuilder.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/PulsarSourceBuilder.java @@ -127,6 +127,7 @@ public final class PulsarSourceBuilder { private StopCursor stopCursor; private Boundedness boundedness; private PulsarDeserializationSchema deserializationSchema; + private boolean enableLogReport; // private builder constructor. PulsarSourceBuilder() { @@ -412,6 +413,11 @@ public PulsarSourceBuilder setProperties(Properties properties) { return this; } + public PulsarSourceBuilder enableLogReport(boolean enableLogReport) { + this.enableLogReport = enableLogReport; + return this; + } + /** * Build the {@link PulsarSource}. * @@ -498,7 +504,8 @@ public PulsarSource build() { startCursor, stopCursor, boundedness, - deserializationSchema); + deserializationSchema, + enableLogReport); } // ------------- private helpers -------------- diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/PulsarSourceReaderFactory.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/PulsarSourceReaderFactory.java index bbc42c149f4..3bccfda1f3b 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/PulsarSourceReaderFactory.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/PulsarSourceReaderFactory.java @@ -64,7 +64,8 @@ private PulsarSourceReaderFactory() { public static SourceReader create( SourceReaderContext readerContext, PulsarDeserializationSchema deserializationSchema, - SourceConfiguration sourceConfiguration) { + SourceConfiguration sourceConfiguration, + boolean enableLogReport) { PulsarClient pulsarClient = createClient(sourceConfiguration); PulsarAdmin pulsarAdmin = createAdmin(sourceConfiguration); @@ -93,7 +94,8 @@ public static SourceReader create( sourceConfiguration, pulsarClient, pulsarAdmin, - deserializationSchema); + deserializationSchema, + enableLogReport); } else if (subscriptionType == SubscriptionType.Shared || subscriptionType == SubscriptionType.Key_Shared) { TransactionCoordinatorClient coordinatorClient = @@ -119,7 +121,8 @@ public static SourceReader create( pulsarClient, pulsarAdmin, coordinatorClient, - deserializationSchema); + deserializationSchema, + enableLogReport); } else { throw new UnsupportedOperationException( "This subscription type is not " + subscriptionType + " supported currently."); diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarOrderedSourceReader.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarOrderedSourceReader.java index 3c75793f93f..75ac35c025e 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarOrderedSourceReader.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarOrderedSourceReader.java @@ -83,14 +83,16 @@ public PulsarOrderedSourceReader( SourceConfiguration sourceConfiguration, PulsarClient pulsarClient, PulsarAdmin pulsarAdmin, - PulsarDeserializationSchema deserializationSchema) { + PulsarDeserializationSchema deserializationSchema, + boolean enableLogReport) { super( elementsQueue, new PulsarOrderedFetcherManager<>(elementsQueue, splitReaderSupplier::get), context, sourceConfiguration, pulsarClient, - pulsarAdmin); + pulsarAdmin, + enableLogReport); this.cursorsToCommit = Collections.synchronizedSortedMap(new TreeMap<>()); this.cursorsOfFinishedSplits = new ConcurrentHashMap<>(); diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarSourceReaderBase.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarSourceReaderBase.java index f7e6bafc1da..3d9bc6a5174 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarSourceReaderBase.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarSourceReaderBase.java @@ -17,6 +17,8 @@ package org.apache.inlong.sort.pulsar.source.reader; +import org.apache.inlong.sort.base.util.OpenTelemetryLogger; + import org.apache.flink.api.connector.source.SourceReaderContext; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; import org.apache.flink.connector.base.source.reader.SourceReaderBase; @@ -44,6 +46,8 @@ abstract class PulsarSourceReaderBase protected final SourceConfiguration sourceConfiguration; protected final PulsarClient pulsarClient; protected final PulsarAdmin pulsarAdmin; + private OpenTelemetryLogger openTelemetryLogger; + protected final boolean enableLogReport; protected PulsarSourceReaderBase( FutureCompletingBlockingQueue>> elementsQueue, @@ -51,7 +55,8 @@ protected PulsarSourceReaderBase( SourceReaderContext context, SourceConfiguration sourceConfiguration, PulsarClient pulsarClient, - PulsarAdmin pulsarAdmin) { + PulsarAdmin pulsarAdmin, + boolean enableLogReport) { super( elementsQueue, splitFetcherManager, @@ -62,6 +67,13 @@ protected PulsarSourceReaderBase( this.sourceConfiguration = sourceConfiguration; this.pulsarClient = pulsarClient; this.pulsarAdmin = pulsarAdmin; + this.enableLogReport = enableLogReport; + if (enableLogReport) { + this.openTelemetryLogger = new OpenTelemetryLogger.Builder() + .setLogLevel(org.apache.logging.log4j.Level.ERROR) + .setServiceName(this.getClass().getSimpleName()) + .setLocalHostIp(this.context.getLocalHostName()).build(); + } } @Override @@ -75,6 +87,14 @@ protected PulsarPartitionSplit toSplitType( return splitState.toPulsarPartitionSplit(); } + @Override + public void start() { + if (enableLogReport) { + this.openTelemetryLogger.install(); + } + super.start(); + } + @Override public void close() throws Exception { // Close the all the consumers first. @@ -83,5 +103,8 @@ public void close() throws Exception { // Close shared pulsar resources. pulsarClient.shutdown(); pulsarAdmin.close(); + if (enableLogReport) { + openTelemetryLogger.uninstall(); + } } } diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarUnorderedSourceReader.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarUnorderedSourceReader.java index adf15de0b11..f30d7b3477a 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarUnorderedSourceReader.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarUnorderedSourceReader.java @@ -81,14 +81,16 @@ public PulsarUnorderedSourceReader( PulsarClient pulsarClient, PulsarAdmin pulsarAdmin, @Nullable TransactionCoordinatorClient coordinatorClient, - PulsarDeserializationSchema deserializationSchema) { + PulsarDeserializationSchema deserializationSchema, + boolean enableLogReport) { super( elementsQueue, new PulsarUnorderedFetcherManager<>(elementsQueue, splitReaderSupplier::get), context, sourceConfiguration, pulsarClient, - pulsarAdmin); + pulsarAdmin, + enableLogReport); this.coordinatorClient = coordinatorClient; this.transactionsToCommit = Collections.synchronizedSortedMap(new TreeMap<>()); this.transactionsOfFinishedSplits = Collections.synchronizedList(new ArrayList<>()); diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableSource.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableSource.java index e5df5486500..cd73a7bfce1 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableSource.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableSource.java @@ -86,6 +86,8 @@ public class PulsarTableSource implements ScanTableSource, SupportsReadingMetada private final SubscriptionType subscriptionType; + private final boolean enableLogReport; + public PulsarTableSource( PulsarTableDeserializationSchemaFactory deserializationSchemaFactory, DecodingFormat> decodingFormatForReadingMetadata, @@ -94,7 +96,8 @@ public PulsarTableSource( Properties properties, StartCursor startCursor, StopCursor stopCursor, - SubscriptionType subscriptionType) { + SubscriptionType subscriptionType, + boolean enableLogReport) { // Format attributes this.deserializationSchemaFactory = checkNotNull(deserializationSchemaFactory); this.decodingFormatForReadingMetadata = checkNotNull(decodingFormatForReadingMetadata); @@ -105,6 +108,7 @@ public PulsarTableSource( this.startCursor = checkNotNull(startCursor); this.stopCursor = checkNotNull(stopCursor); this.subscriptionType = checkNotNull(subscriptionType); + this.enableLogReport = enableLogReport; } @Override @@ -127,6 +131,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) { // only support exclusive since shared mode requires pulsar with transaction enabled // and supporting transaction consumes more resources in pulsar broker .setSubscriptionType(SubscriptionType.Exclusive) + .enableLogReport(enableLogReport) .build(); return SourceProvider.of(source); } @@ -194,7 +199,8 @@ public DynamicTableSource copy() { properties, startCursor, stopCursor, - subscriptionType); + subscriptionType, + enableLogReport); } @Override @@ -215,7 +221,8 @@ public boolean equals(Object o) { && Objects.equals(properties, that.properties) && Objects.equals(startCursor, that.startCursor) && Objects.equals(stopCursor, that.stopCursor) - && subscriptionType == that.subscriptionType; + && subscriptionType == that.subscriptionType + && Objects.equals(enableLogReport, that.enableLogReport); } @Override @@ -228,6 +235,7 @@ public int hashCode() { properties, startCursor, stopCursor, - subscriptionType); + subscriptionType, + enableLogReport); } }