Skip to content

Commit

Permalink
[INLONG-11349][Sort] Integrate opentelemetry for sort-connectors-v1.15
Browse files Browse the repository at this point in the history
  • Loading branch information
qy-liuhuo committed Oct 13, 2024
1 parent 12e9e1d commit ed4ea5e
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.inlong.sort.iceberg.source.reader;

import org.apache.inlong.sort.base.util.OpenTelemetryLogger;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
Expand All @@ -25,6 +27,7 @@
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.flink.source.split.SplitRequestEvent;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.logging.log4j.Level;

import java.util.Collection;
import java.util.Collections;
Expand All @@ -40,6 +43,7 @@ public class IcebergSourceReader<T>
SingleThreadMultiplexSourceReaderBase<RecordAndPosition<T>, T, IcebergSourceSplit, IcebergSourceSplit> {

private final InlongIcebergSourceReaderMetrics<T> metrics;
private final OpenTelemetryLogger openTelemetryLogger;
public IcebergSourceReader(
InlongIcebergSourceReaderMetrics<T> metrics,
ReaderFunction<T> readerFunction,
Expand All @@ -50,17 +54,28 @@ public IcebergSourceReader(
context.getConfiguration(),
context);
this.metrics = metrics;
this.openTelemetryLogger = new OpenTelemetryLogger.Builder()
.setLogLevel(Level.ERROR)
.setServiceName(this.getClass().getSimpleName())
.setLocalHostIp(this.context.getLocalHostName()).build();
}

@Override
public void start() {
this.openTelemetryLogger.install();
// We request a split only if we did not get splits during the checkpoint restore.
// Otherwise, reader restarts will keep requesting more and more splits.
if (getNumberOfCurrentlyAssignedSplits() == 0) {
requestSplit(Collections.emptyList());
}
}

@Override
public void close() throws Exception {
super.close();
openTelemetryLogger.uninstall();
}

@Override
protected void onSplitFinished(Map<String, IcebergSourceSplit> finishedSplitIds) {
requestSplit(Lists.newArrayList(finishedSplitIds.keySet()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.inlong.sort.kafka.source.reader;

import org.apache.inlong.sort.base.util.OpenTelemetryLogger;
import org.apache.inlong.sort.kafka.table.DynamicKafkaDeserializationSchema;

import org.apache.flink.annotation.Internal;
Expand All @@ -37,6 +38,7 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.logging.log4j.Level;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -66,6 +68,7 @@ public class KafkaSourceReader<T>
private final KafkaSourceReaderMetrics kafkaSourceReaderMetrics;
private final boolean commitOffsetsOnCheckpoint;
private final KafkaDeserializationSchema<RowData> metricSchema;
private final OpenTelemetryLogger openTelemetryLogger;

public KafkaSourceReader(
FutureCompletingBlockingQueue<RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>>> elementsQueue,
Expand All @@ -87,6 +90,22 @@ public KafkaSourceReader(
"Offset commit on checkpoint is disabled. "
+ "Consuming offset will not be reported back to Kafka cluster.");
}
this.openTelemetryLogger = new OpenTelemetryLogger.Builder()
.setLogLevel(Level.ERROR)
.setServiceName(this.getClass().getSimpleName())
.setLocalHostIp(this.context.getLocalHostName()).build();
}

@Override
public void start() {
this.openTelemetryLogger.install();
super.start();
}

@Override
public void close() throws Exception {
super.close();
openTelemetryLogger.uninstall(); // 关闭日志上报功能
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.inlong.sort.mysql.source.reader;

import org.apache.inlong.sort.base.util.OpenTelemetryLogger;
import org.apache.inlong.sort.mysql.RowDataDebeziumDeserializeSchema;

import com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils;
Expand Down Expand Up @@ -55,6 +56,7 @@
import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.logging.log4j.Level;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -88,6 +90,7 @@ public class MySqlSourceReader<T>
private final MySqlSourceReaderContext mySqlSourceReaderContext;
private MySqlBinlogSplit suspendedBinlogSplit;
private final DebeziumDeserializationSchema<T> metricSchema;
private final OpenTelemetryLogger openTelemetryLogger;

public MySqlSourceReader(
FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecords>> elementQueue,
Expand All @@ -109,15 +112,26 @@ public MySqlSourceReader(
this.mySqlSourceReaderContext = context;
this.suspendedBinlogSplit = null;
this.metricSchema = metricSchema;
this.openTelemetryLogger = new OpenTelemetryLogger.Builder()
.setLogLevel(Level.ERROR)
.setServiceName(this.getClass().getSimpleName())
.setLocalHostIp(this.context.getLocalHostName()).build();
}

@Override
public void start() {
openTelemetryLogger.install();
if (getNumberOfCurrentlyAssignedSplits() == 0) {
context.sendSplitRequest();
}
}

@Override
public void close() throws Exception {
super.close();
openTelemetryLogger.uninstall();
}

@Override
protected MySqlSplitState initializedState(MySqlSplit split) {
if (split.isSnapshotSplit()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.inlong.sort.pulsar.source.reader;

import org.apache.inlong.sort.base.util.OpenTelemetryLogger;

import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.SourceReaderBase;
Expand Down Expand Up @@ -44,6 +46,7 @@ abstract class PulsarSourceReaderBase<OUT>
protected final SourceConfiguration sourceConfiguration;
protected final PulsarClient pulsarClient;
protected final PulsarAdmin pulsarAdmin;
private final OpenTelemetryLogger openTelemetryLogger;

protected PulsarSourceReaderBase(
FutureCompletingBlockingQueue<RecordsWithSplitIds<PulsarMessage<OUT>>> elementsQueue,
Expand All @@ -62,6 +65,10 @@ protected PulsarSourceReaderBase(
this.sourceConfiguration = sourceConfiguration;
this.pulsarClient = pulsarClient;
this.pulsarAdmin = pulsarAdmin;
this.openTelemetryLogger = new OpenTelemetryLogger.Builder()
.setLogLevel(org.apache.logging.log4j.Level.ERROR)
.setServiceName(this.getClass().getSimpleName())
.setLocalHostIp(this.context.getLocalHostName()).build();
}

@Override
Expand All @@ -75,6 +82,12 @@ protected PulsarPartitionSplit toSplitType(
return splitState.toPulsarPartitionSplit();
}

@Override
public void start() {
this.openTelemetryLogger.install();
super.start();
}

@Override
public void close() throws Exception {
// Close the all the consumers first.
Expand All @@ -83,5 +96,6 @@ public void close() throws Exception {
// Close shared pulsar resources.
pulsarClient.shutdown();
pulsarAdmin.close();
openTelemetryLogger.uninstall();
}
}

0 comments on commit ed4ea5e

Please sign in to comment.