Skip to content

Commit

Permalink
removed unnecessary flink internal annotations and added copied infor…
Browse files Browse the repository at this point in the history
…mation in every copied class
  • Loading branch information
PeterZh6 committed Nov 25, 2024
1 parent 6d46470 commit b0cf263
Show file tree
Hide file tree
Showing 14 changed files with 18 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,9 @@
* <p>See {@link KafkaSourceBuilder} for more details on how to configure this source.
*
* @param <OUT> the output type of the source.
* copied from org.apache.flink:flink-connector-kafka:1.18.0
* copied from org.apache.flink:flink-connector-kafka:3.2.0
*/
// TODO: Add a variable metricSchema to report audit information
@PublicEvolving
public class KafkaSource<OUT>
implements
Source<OUT, KafkaPartitionSplit, KafkaSourceEnumState>,
Expand Down Expand Up @@ -137,7 +136,6 @@ public Boundedness getBoundedness() {
return this.boundedness;
}

@Internal
@Override
public SourceReader<OUT, KafkaPartitionSplit> createReader(SourceReaderContext readerContext)
throws Exception {
Expand Down Expand Up @@ -187,7 +185,7 @@ public UserCodeClassLoader getUserCodeClassLoader() {
kafkaSourceReaderMetrics);
}

@Internal

@Override
public SplitEnumerator<KafkaPartitionSplit, KafkaSourceEnumState> createEnumerator(
SplitEnumeratorContext<KafkaPartitionSplit> enumContext) {
Expand All @@ -200,7 +198,7 @@ public SplitEnumerator<KafkaPartitionSplit, KafkaSourceEnumState> createEnumerat
boundedness);
}

@Internal

@Override
public SplitEnumerator<KafkaPartitionSplit, KafkaSourceEnumState> restoreEnumerator(
SplitEnumeratorContext<KafkaPartitionSplit> enumContext,
Expand All @@ -216,13 +214,13 @@ public SplitEnumerator<KafkaPartitionSplit, KafkaSourceEnumState> restoreEnumera
checkpoint);
}

@Internal

@Override
public SimpleVersionedSerializer<KafkaPartitionSplit> getSplitSerializer() {
return new KafkaPartitionSplitSerializer();
}

@Internal

@Override
public SimpleVersionedSerializer<KafkaSourceEnumState> getEnumeratorCheckpointSerializer() {
return new KafkaSourceEnumStateSerializer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,9 @@
*
* <p>Check the Java docs of each individual methods to learn more about the settings to build a
* KafkaSource.
* copied from org.apache.flink:flink-connector-kafka:1.18.0
* copied from org.apache.flink:flink-connector-kafka:3.2.0
*/
// TODO: Add a variable metricSchema to report audit information
@PublicEvolving
public class KafkaSourceBuilder<OUT> {

private static final Logger LOG = LoggerFactory.getLogger(KafkaSourceBuilder.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,9 @@
import java.util.concurrent.ConcurrentMap;

/** The source reader for Kafka partitions.
* copied from org.apache.flink:flink-connector-kafka:1.18.0
* copied from org.apache.flink:flink-connector-kafka:3.2.0
*/
// TODO: Add some method to make report audit information exactly once
@Internal
public class KafkaSourceReader<T>
extends
SingleThreadMultiplexSourceReaderBase<ConsumerRecord<byte[], byte[]>, T, KafkaPartitionSplit, KafkaPartitionSplitState> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import java.util.List;

/** A specific {@link KafkaSerializationSchema} for {@link KafkaDynamicSource}.
* copied from org.apache.flink:flink-connector-kafka:1.18.0
* copied from org.apache.flink:flink-connector-kafka:3.2.0
*/
// TODO: support SourceExactlyMetric and add metric collection points
class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<RowData> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import static org.apache.flink.util.Preconditions.checkNotNull;

/** SerializationSchema used by {@link KafkaDynamicSink} to configure a {@link KafkaSink}.
* copied from org.apache.flink:flink-connector-kafka:1.18.0
* copied from org.apache.flink:flink-connector-kafka:3.2.0
*/
class DynamicKafkaRecordSerializationSchema implements KafkaRecordSerializationSchema<RowData> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,8 @@

/**
* Options for the Kafka connector.
* copied from org.apache.flink:flink-connector-kafka:1.18.0
* copied from org.apache.flink:flink-connector-kafka:3.2.0
*/
@PublicEvolving
public class KafkaConnectorOptions {

// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,8 @@
import static org.apache.flink.table.factories.FactoryUtil.FORMAT;

/** Utilities for {@link KafkaConnectorOptions}.
* copied from org.apache.flink:flink-connector-kafka:1.18.0
* copied from org.apache.flink:flink-connector-kafka:3.2.0
*/
@Internal
class KafkaConnectorOptionsUtil {

private static final ConfigOption<String> SCHEMA_REGISTRY_SUBJECT =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@

/**
* A version-agnostic Kafka {@link DynamicTableSink}.
* copied from org.apache.flink:flink-connector-kafka:3.2.0
*/
@Internal
public class KafkaDynamicSink implements DynamicTableSink, SupportsWritingMetadata {

private static final String UPSERT_KAFKA_TRANSFORMATION = "upsert-kafka";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,8 @@

/**
* A version-agnostic Kafka {@link ScanTableSource}.
* copied from org.apache.flink:flink-connector-kafka:1.18.0
* org.apache.flink:flink-connector-kafka:3.2.0
*/
@Internal
public class KafkaDynamicSource
implements
ScanTableSource,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,8 @@
/**
* Factory for creating configured instances of {@link KafkaDynamicSource} and {@link
* KafkaDynamicSink}.
* copied from org.apache.flink:flink-connector-kafka:1.18.0
* org.apache.flink:flink-connector-kafka:3.2.0
*/
@Internal
public class KafkaDynamicTableFactory
implements
DynamicTableSourceFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
* <p>The sink provides eventual consistency guarantees under {@link
* org.apache.flink.connector.base.DeliveryGuarantee#AT_LEAST_ONCE} because the updates are
* idempotent therefore duplicates have no effect.
* copied from org.apache.flink:flink-connector-kafka:1.18.0
* copied from org.apache.flink:flink-connector-kafka:3.2.0
*/
class ReducingUpsertSink<WriterState, Comm>
implements
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import static org.apache.inlong.sort.kafka.table.DynamicKafkaRecordSerializationSchema.createProjectedRow;

/**
* copied from org.apache.flink:flink-connector-kafka:1.18.0
* copied from org.apache.flink:flink-connector-kafka:3.2.0
*/
class ReducingUpsertWriter<WriterState, Comm>
implements
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.util.Objects;

/** Sink buffer flush configuration.
* copied from org.apache.flink:flink-connector-kafka:1.18.0
* copied from org.apache.flink:flink-connector-kafka:3.2.0
*/
public class SinkBufferFlushMode implements Serializable {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@

/**
* Upsert-Kafka factory.
* copied from org.apache.flink:flink-connector-kafka:1.18.0
* copied from org.apache.flink:flink-connector-kafka:3.2.0
*/
public class UpsertKafkaDynamicTableFactory
implements
Expand Down

0 comments on commit b0cf263

Please sign in to comment.