Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
vernedeng committed Apr 20, 2024
1 parent 2a52a16 commit 61b8967
Show file tree
Hide file tree
Showing 72 changed files with 363 additions and 256 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.apache.inlong.agent.pojo.PulsarTask.PulsarTaskConfig;
import org.apache.inlong.agent.pojo.RedisTask.RedisTaskConfig;
import org.apache.inlong.agent.pojo.SqlServerTask.SqlserverTaskConfig;
import org.apache.inlong.common.constant.Constants;
import org.apache.inlong.common.constant.MQType;
import org.apache.inlong.common.enums.TaskTypeEnum;
import org.apache.inlong.common.pojo.agent.DataConfig;

Expand Down Expand Up @@ -453,9 +453,9 @@ public static TaskProfile convertToTaskProfile(DataConfig dataConfig) {
String mqType = dataConfig.getMqClusters().get(0).getMqType();
task.setMqClusters(GSON.toJson(dataConfig.getMqClusters()));
task.setTopicInfo(GSON.toJson(dataConfig.getTopicInfo()));
if (mqType.equals(Constants.MQType.PULSAR)) {
if (mqType.equals(MQType.PULSAR)) {
task.setSink(PULSAR_SINK);
} else if (mqType.equals(Constants.MQType.KAFKA)) {
} else if (mqType.equals(MQType.KAFKA)) {
task.setSink(KAFKA_SINK);
} else {
throw new IllegalArgumentException("input dataConfig" + dataConfig + "is invalid please check");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.inlong.agent.plugin.message.SequentialID;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.ThreadUtils;
import org.apache.inlong.common.constant.Constants;
import org.apache.inlong.common.constant.ProtocolType;
import org.apache.inlong.common.metric.MetricRegister;
import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
Expand Down Expand Up @@ -203,7 +203,7 @@ private void createMessageSender(String tagName) throws Exception {

proxyClientConfig.setIoThreadNum(ioThreadNum);
proxyClientConfig.setEnableBusyWait(enableBusyWait);
proxyClientConfig.setProtocolType(Constants.ProtocolType.TCP);
proxyClientConfig.setProtocolType(ProtocolType.TCP);

SHARED_FACTORY = new DefaultThreadFactory("agent-sender-manager-" + sourcePath,
Thread.currentThread().isDaemon());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.inlong.audit.base.HighPriorityThreadFactory;
import org.apache.inlong.audit.file.ConfigManager;
import org.apache.inlong.audit.utils.FailoverChannelProcessorHolder;
import org.apache.inlong.common.constant.Constants;
import org.apache.inlong.common.constant.MQType;
import org.apache.inlong.common.pojo.audit.MQInfo;
import org.apache.inlong.common.util.NetworkUtils;

Expand Down Expand Up @@ -289,7 +289,7 @@ private void initTopicProducer(String topic) {
ConfigManager configManager = ConfigManager.getInstance();
List<MQInfo> mqInfoList = configManager.getMqInfoList();
mqInfoList.forEach(mqClusterInfo -> {
if (Constants.MQType.KAFKA.equals(mqClusterInfo.getMqType())) {
if (MQType.KAFKA.equals(mqClusterInfo.getMqType())) {
kafkaServerUrl = mqClusterInfo.getUrl();
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.inlong.audit.consts.ConfigConstants;
import org.apache.inlong.audit.file.ConfigManager;
import org.apache.inlong.audit.utils.FailoverChannelProcessorHolder;
import org.apache.inlong.common.constant.Constants;
import org.apache.inlong.common.constant.MQType;
import org.apache.inlong.common.pojo.audit.MQInfo;
import org.apache.inlong.common.util.NetworkUtils;
import org.apache.inlong.tubemq.client.config.TubeClientConfig;
Expand Down Expand Up @@ -391,7 +391,7 @@ private TubeClientConfig initTubeConfig() throws Exception {
ConfigManager configManager = ConfigManager.getInstance();
List<MQInfo> mqInfoList = configManager.getMqInfoList();
mqInfoList.forEach(mqClusterInfo -> {
if (Constants.MQType.TUBEMQ.equals(mqClusterInfo.getMqType())) {
if (MQType.TUBEMQ.equals(mqClusterInfo.getMqType())) {
masterHostAndPortList = mqClusterInfo.getUrl();
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.inlong.audit.file.ConfigManager;
import org.apache.inlong.audit.sink.EventStat;
import org.apache.inlong.audit.utils.LogCounter;
import org.apache.inlong.common.constant.Constants;
import org.apache.inlong.common.constant.MQType;
import org.apache.inlong.common.pojo.audit.MQInfo;
import org.apache.inlong.common.util.NetworkUtils;

Expand Down Expand Up @@ -102,7 +102,7 @@ public PulsarClientService(Context context) {
ConfigManager configManager = ConfigManager.getInstance();
List<MQInfo> mqInfoList = configManager.getMqInfoList();
mqInfoList.forEach(mqClusterInfo -> {
if (Constants.MQType.PULSAR.equals(mqClusterInfo.getMqType())) {
if (MQType.PULSAR.equals(mqClusterInfo.getMqType())) {
pulsarServerUrl = mqClusterInfo.getUrl();
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.inlong.audit.service.consume.KafkaConsume;
import org.apache.inlong.audit.service.consume.PulsarConsume;
import org.apache.inlong.audit.service.consume.TubeConsume;
import org.apache.inlong.common.constant.Constants;
import org.apache.inlong.common.constant.MQType;
import org.apache.inlong.common.pojo.audit.AuditConfigRequest;
import org.apache.inlong.common.pojo.audit.MQInfo;

Expand Down Expand Up @@ -87,15 +87,15 @@ public void afterPropertiesSet() {
List<InsertData> insertServiceList = this.getInsertServiceList();

for (MQInfo mqInfo : mqInfoList) {
if (mqConfig.isPulsar() && Constants.MQType.PULSAR.equals(mqInfo.getMqType())) {
if (mqConfig.isPulsar() && MQType.PULSAR.equals(mqInfo.getMqType())) {
mqConfig.setPulsarServerUrl(mqInfo.getUrl());
mqConsume = new PulsarConsume(insertServiceList, storeConfig, mqConfig);
break;
} else if (mqConfig.isTube() && Constants.MQType.TUBEMQ.equals(mqInfo.getMqType())) {
} else if (mqConfig.isTube() && MQType.TUBEMQ.equals(mqInfo.getMqType())) {
mqConfig.setTubeMasterList(mqInfo.getUrl());
mqConsume = new TubeConsume(insertServiceList, storeConfig, mqConfig);
break;
} else if (mqConfig.isKafka() && Constants.MQType.KAFKA.equals(mqInfo.getMqType())) {
} else if (mqConfig.isKafka() && MQType.KAFKA.equals(mqInfo.getMqType())) {
mqConfig.setKafkaServerUrl(mqInfo.getUrl());
mqConsume = new KafkaConsume(insertServiceList, storeConfig, mqConfig);
break;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.common.constant;

public class CompressionType {

public static final String GZIP = "GZIP";
public static final String SNAPPY = "SNAPPY";
public static final String LZO = "LZO";
public static final String NONE = "NONE";
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,65 +33,4 @@ public class Constants {
// Default audit version is -1
public static final long DEFAULT_AUDIT_VERSION = -1;

public static class DataNodeType {

public static final String KAFKA = "KAFKA";
public static final String PULSAR = "PULSAR";
public static final String CLS = "CLS";
public static final String ELASTICSEARCH = "ELASTICSEARCH";
}

public static class DeserializationType {

public static final String INLONG_MSG = "INLONG_MSG";
public static final String INLONG_MSG_PB = "INLONG_MSG_PB";
public static final String CSV = "CSV";
public static final String KV = "KV";
}

public static class CompressionType {

public static final String GZIP = "GZIP";
public static final String SNAPPY = "SNAPPY";
public static final String LZO = "LZO";
public static final String NONE = "NONE";
}

/**
* Constants of MQ type.
*/
public static class MQType {

public static final String TUBEMQ = "TUBEMQ";
public static final String PULSAR = "PULSAR";
public static final String KAFKA = "KAFKA";
public static final String TDMQ_PULSAR = "TDMQ_PULSAR";

/**
* Not use any MQ
*/
public static final String NONE = "NONE";

}

/**
* Constants of protocol type.
*/
public static class ProtocolType {

public static final String TCP = "TCP";
public static final String UDP = "UDP";

public static final String HTTP = "HTTP";
public static final String HTTPS = "HTTPS";

}

public static class SinkType {

public static final String KAFKA = "KAFKA";
public static final String PULSAR = "PULSAR";
public static final String CLS = "CLS";
public static final String ELASTICSEARCH = "ELASTICSEARCH";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.common.constant;

public class DataNodeType {

public static final String KAFKA = "KAFKA";
public static final String PULSAR = "PULSAR";
public static final String CLS = "CLS";
public static final String ELASTICSEARCH = "ELASTICSEARCH";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.common.constant;

public class DeserializationType {

public static final String INLONG_MSG = "INLONG_MSG";
public static final String INLONG_MSG_PB = "INLONG_MSG_PB";
public static final String CSV = "CSV";
public static final String KV = "KV";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.common.constant;

/**
* Constants of MQ type.
*/
public class MQType {

public static final String TUBEMQ = "TUBEMQ";
public static final String PULSAR = "PULSAR";
public static final String KAFKA = "KAFKA";
public static final String TDMQ_PULSAR = "TDMQ_PULSAR";

/**
* Not use any MQ
*/
public static final String NONE = "NONE";

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.common.constant;

/**
* Constants of protocol type.
*/
public class ProtocolType {

public static final String TCP = "TCP";
public static final String UDP = "UDP";

public static final String HTTP = "HTTP";
public static final String HTTPS = "HTTPS";

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.common.constant;

public class SinkType {

public static final String KAFKA = "KAFKA";
public static final String PULSAR = "PULSAR";
public static final String CLS = "CLS";
public static final String ELASTICSEARCH = "ELASTICSEARCH";
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,18 @@

package org.apache.inlong.common.pojo.sort.dataflow.deserialization;

import org.apache.inlong.common.constant.Constants;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSubTypes;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.apache.inlong.common.constant.DeserializationType;

import java.io.Serializable;

@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes({
@JsonSubTypes.Type(value = InlongMsgDeserializationConfig.class, name = Constants.DeserializationType.INLONG_MSG),
@JsonSubTypes.Type(value = InlongMsgPbDeserialiationConfig.class, name = Constants.DeserializationType.INLONG_MSG_PB),
@JsonSubTypes.Type(value = CsvDeserializationConfig.class, name = Constants.DeserializationType.CSV),
@JsonSubTypes.Type(value = KvDeserializationConfig.class, name = Constants.DeserializationType.KV),
@JsonSubTypes.Type(value = InlongMsgDeserializationConfig.class, name = DeserializationType.INLONG_MSG),
@JsonSubTypes.Type(value = InlongMsgPbDeserialiationConfig.class, name = DeserializationType.INLONG_MSG_PB),
@JsonSubTypes.Type(value = CsvDeserializationConfig.class, name = DeserializationType.CSV),
@JsonSubTypes.Type(value = KvDeserializationConfig.class, name = DeserializationType.KV),
})
public interface DeserializationConfig extends Serializable {

Expand Down
Loading

0 comments on commit 61b8967

Please sign in to comment.