Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
vernedeng committed Apr 19, 2024
1 parent e4a9b26 commit 6ba1f26
Show file tree
Hide file tree
Showing 359 changed files with 1,173 additions and 1,168 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.apache.inlong.agent.pojo.PulsarTask.PulsarTaskConfig;
import org.apache.inlong.agent.pojo.RedisTask.RedisTaskConfig;
import org.apache.inlong.agent.pojo.SqlServerTask.SqlserverTaskConfig;
import org.apache.inlong.common.constant.MQType;
import org.apache.inlong.common.constant.Constants;
import org.apache.inlong.common.enums.TaskTypeEnum;
import org.apache.inlong.common.pojo.agent.DataConfig;

Expand Down Expand Up @@ -453,9 +453,9 @@ public static TaskProfile convertToTaskProfile(DataConfig dataConfig) {
String mqType = dataConfig.getMqClusters().get(0).getMqType();
task.setMqClusters(GSON.toJson(dataConfig.getMqClusters()));
task.setTopicInfo(GSON.toJson(dataConfig.getTopicInfo()));
if (mqType.equals(MQType.PULSAR)) {
if (mqType.equals(Constants.MQType.PULSAR)) {
task.setSink(PULSAR_SINK);
} else if (mqType.equals(MQType.KAFKA)) {
} else if (mqType.equals(Constants.MQType.KAFKA)) {
task.setSink(KAFKA_SINK);
} else {
throw new IllegalArgumentException("input dataConfig" + dataConfig + "is invalid please check");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.inlong.agent.plugin.message.SequentialID;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.ThreadUtils;
import org.apache.inlong.common.constant.ProtocolType;
import org.apache.inlong.common.constant.Constants;
import org.apache.inlong.common.metric.MetricRegister;
import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
Expand Down Expand Up @@ -203,7 +203,7 @@ private void createMessageSender(String tagName) throws Exception {

proxyClientConfig.setIoThreadNum(ioThreadNum);
proxyClientConfig.setEnableBusyWait(enableBusyWait);
proxyClientConfig.setProtocolType(ProtocolType.TCP);
proxyClientConfig.setProtocolType(Constants.ProtocolType.TCP);

SHARED_FACTORY = new DefaultThreadFactory("agent-sender-manager-" + sourcePath,
Thread.currentThread().isDaemon());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.inlong.audit.base.HighPriorityThreadFactory;
import org.apache.inlong.audit.file.ConfigManager;
import org.apache.inlong.audit.utils.FailoverChannelProcessorHolder;
import org.apache.inlong.common.constant.MQType;
import org.apache.inlong.common.constant.Constants;
import org.apache.inlong.common.pojo.audit.MQInfo;
import org.apache.inlong.common.util.NetworkUtils;

Expand Down Expand Up @@ -289,7 +289,7 @@ private void initTopicProducer(String topic) {
ConfigManager configManager = ConfigManager.getInstance();
List<MQInfo> mqInfoList = configManager.getMqInfoList();
mqInfoList.forEach(mqClusterInfo -> {
if (MQType.KAFKA.equals(mqClusterInfo.getMqType())) {
if (Constants.MQType.KAFKA.equals(mqClusterInfo.getMqType())) {
kafkaServerUrl = mqClusterInfo.getUrl();
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.inlong.audit.consts.ConfigConstants;
import org.apache.inlong.audit.file.ConfigManager;
import org.apache.inlong.audit.utils.FailoverChannelProcessorHolder;
import org.apache.inlong.common.constant.MQType;
import org.apache.inlong.common.constant.Constants;
import org.apache.inlong.common.pojo.audit.MQInfo;
import org.apache.inlong.common.util.NetworkUtils;
import org.apache.inlong.tubemq.client.config.TubeClientConfig;
Expand Down Expand Up @@ -391,7 +391,7 @@ private TubeClientConfig initTubeConfig() throws Exception {
ConfigManager configManager = ConfigManager.getInstance();
List<MQInfo> mqInfoList = configManager.getMqInfoList();
mqInfoList.forEach(mqClusterInfo -> {
if (MQType.TUBEMQ.equals(mqClusterInfo.getMqType())) {
if (Constants.MQType.TUBEMQ.equals(mqClusterInfo.getMqType())) {
masterHostAndPortList = mqClusterInfo.getUrl();
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.inlong.audit.file.ConfigManager;
import org.apache.inlong.audit.sink.EventStat;
import org.apache.inlong.audit.utils.LogCounter;
import org.apache.inlong.common.constant.MQType;
import org.apache.inlong.common.constant.Constants;
import org.apache.inlong.common.pojo.audit.MQInfo;
import org.apache.inlong.common.util.NetworkUtils;

Expand Down Expand Up @@ -102,7 +102,7 @@ public PulsarClientService(Context context) {
ConfigManager configManager = ConfigManager.getInstance();
List<MQInfo> mqInfoList = configManager.getMqInfoList();
mqInfoList.forEach(mqClusterInfo -> {
if (MQType.PULSAR.equals(mqClusterInfo.getMqType())) {
if (Constants.MQType.PULSAR.equals(mqClusterInfo.getMqType())) {
pulsarServerUrl = mqClusterInfo.getUrl();
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.inlong.audit.service.consume.KafkaConsume;
import org.apache.inlong.audit.service.consume.PulsarConsume;
import org.apache.inlong.audit.service.consume.TubeConsume;
import org.apache.inlong.common.constant.MQType;
import org.apache.inlong.common.constant.Constants;
import org.apache.inlong.common.pojo.audit.AuditConfigRequest;
import org.apache.inlong.common.pojo.audit.MQInfo;

Expand Down Expand Up @@ -87,15 +87,15 @@ public void afterPropertiesSet() {
List<InsertData> insertServiceList = this.getInsertServiceList();

for (MQInfo mqInfo : mqInfoList) {
if (mqConfig.isPulsar() && MQType.PULSAR.equals(mqInfo.getMqType())) {
if (mqConfig.isPulsar() && Constants.MQType.PULSAR.equals(mqInfo.getMqType())) {
mqConfig.setPulsarServerUrl(mqInfo.getUrl());
mqConsume = new PulsarConsume(insertServiceList, storeConfig, mqConfig);
break;
} else if (mqConfig.isTube() && MQType.TUBEMQ.equals(mqInfo.getMqType())) {
} else if (mqConfig.isTube() && Constants.MQType.TUBEMQ.equals(mqInfo.getMqType())) {
mqConfig.setTubeMasterList(mqInfo.getUrl());
mqConsume = new TubeConsume(insertServiceList, storeConfig, mqConfig);
break;
} else if (mqConfig.isKafka() && MQType.KAFKA.equals(mqInfo.getMqType())) {
} else if (mqConfig.isKafka() && Constants.MQType.KAFKA.equals(mqInfo.getMqType())) {
mqConfig.setKafkaServerUrl(mqInfo.getUrl());
mqConsume = new KafkaConsume(insertServiceList, storeConfig, mqConfig);
break;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,65 @@ public class Constants {
// Default audit version is -1
public static final long DEFAULT_AUDIT_VERSION = -1;

public static class DataNodeType {

public static final String KAFKA = "KAFKA";
public static final String PULSAR = "PULSAR";
public static final String CLS = "CLS";
public static final String ELASTICSEARCH = "ELASTICSEARCH";
}

public static class DeserializationType {

public static final String INLONG_MSG = "INLONG_MSG";
public static final String INLONG_MSG_PB = "INLONG_MSG_PB";
public static final String CSV = "CSV";
public static final String KV = "KV";
}

public static class CompressionType {

public static final String GZIP = "GZIP";
public static final String SNAPPY = "SNAPPY";
public static final String LZO = "LZO";
public static final String NONE = "NONE";
}

/**
* Constants of MQ type.
*/
public static class MQType {

public static final String TUBEMQ = "TUBEMQ";
public static final String PULSAR = "PULSAR";
public static final String KAFKA = "KAFKA";
public static final String TDMQ_PULSAR = "TDMQ_PULSAR";

/**
* Not use any MQ
*/
public static final String NONE = "NONE";

}

/**
* Constants of protocol type.
*/
public static class ProtocolType {

public static final String TCP = "TCP";
public static final String UDP = "UDP";

public static final String HTTP = "HTTP";
public static final String HTTPS = "HTTPS";

}

public static class SinkType {

public static final String KAFKA = "KAFKA";
public static final String PULSAR = "PULSAR";
public static final String CLS = "CLS";
public static final String ELASTICSEARCH = "ELASTICSEARCH";
}
}

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.inlong.common.pojo.sort;

import org.apache.inlong.common.pojo.sort.dataflow.node.NodeConfig;
import org.apache.inlong.common.pojo.sort.node.NodeConfig;

import lombok.Data;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@
package org.apache.inlong.common.pojo.sort.dataflow;

import org.apache.inlong.common.pojo.sort.dataflow.deserialization.DeserializationConfig;
import org.apache.inlong.common.pojo.sort.dataflow.field.FieldConfig;

import lombok.Data;

import java.io.Serializable;
import java.util.List;

@Data
public class SourceConfig implements Serializable {

private String topic;
private String subscription;
private String encodingType;
private DeserializationConfig deserializationConfig;
private List<FieldConfig> fieldConfigs;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.inlong.common.pojo.sort.dataflow.deserialization;

import org.apache.inlong.common.constant.DeserializationType;
import org.apache.inlong.common.constant.Constants;

import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
Expand All @@ -26,10 +26,10 @@

@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes({
@JsonSubTypes.Type(value = InlongMsgDeserializationConfig.class, name = DeserializationType.INLONG_MSG),
@JsonSubTypes.Type(value = InlongMsgPbDeserialiationConfig.class, name = DeserializationType.INLONG_MSG_PB),
@JsonSubTypes.Type(value = CsvDeserializationConfig.class, name = DeserializationType.CSV),
@JsonSubTypes.Type(value = KvDeserializationConfig.class, name = DeserializationType.KV),
@JsonSubTypes.Type(value = InlongMsgDeserializationConfig.class, name = Constants.DeserializationType.INLONG_MSG),
@JsonSubTypes.Type(value = InlongMsgPbDeserialiationConfig.class, name = Constants.DeserializationType.INLONG_MSG_PB),
@JsonSubTypes.Type(value = CsvDeserializationConfig.class, name = Constants.DeserializationType.CSV),
@JsonSubTypes.Type(value = KvDeserializationConfig.class, name = Constants.DeserializationType.KV),
})
public interface DeserializationConfig extends Serializable {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,5 @@ public class KvDeserializationConfig implements DeserializationConfig {

private char entrySplitter;
private char kvSplitter;
private String streamId;
private Character escapeChar;
}
Loading

0 comments on commit 6ba1f26

Please sign in to comment.