Skip to content

Commit

Permalink
metric monitor
Browse files Browse the repository at this point in the history
  • Loading branch information
vernedeng committed Jun 30, 2024
1 parent d518962 commit caa99a9
Show file tree
Hide file tree
Showing 3 changed files with 171 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sort.standalone.metrics;

import org.apache.inlong.common.pojo.sort.SortConfig;

import org.apache.flume.conf.Configurable;

import java.util.Collection;

public interface SortConfigMetricListener extends Configurable {

void reportOffline(SortConfig sortConfig);
void reportOnline(SortConfig sortConfig);
void reportUpdate(SortConfig sortConfig);
void reportParseFail(String dataflowId);
void reportRequestConfigFail();
void reportDecompressFail();
void reportCheckFail();
void reportRequestNoUpdate();
void reportRequestUpdate();
void reportMissInSortClusterConfig(Collection<String> dataflows);
void reportMissInSortConfig(Collection<String> dataflows);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sort.standalone.metrics;

import org.apache.inlong.common.pojo.sort.SortConfig;
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ClassUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flume.Context;
import org.apache.flume.conf.Configurable;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

@Slf4j
public class SortConfigMetricReporter {

private static final String KEY_SORT_CONFIG_METRIC_LISTENER = "sortConfig.metricListener";
private static final AtomicBoolean isInited = new AtomicBoolean(false);
private static List<SortConfigMetricListener> listeners;

public static void init(Map<String, String> commonProperties) {
if (!isInited.compareAndSet(false, true)) {
return;
}

String listenerStr = commonProperties.get(KEY_SORT_CONFIG_METRIC_LISTENER);
if (StringUtils.isBlank(listenerStr)) {
log.warn("There is no specified SortConfigMetricListener");
listeners = new ArrayList<>();
return;
}
String[] listenerList = listenerStr.split("\\s+");
listeners = Arrays.stream(listenerList)
.map(SortConfigMetricReporter::loadConfigMetricReporter)
.filter(Objects::nonNull)
.collect(Collectors.toList());
log.info("SortConfigMetricListeners={}", listeners);
}

private static SortConfigMetricListener loadConfigMetricReporter(String type) {
if (StringUtils.isEmpty(type)) {
log.warn("There is no specified SortConfigMetricReporter type");
return null;
}
log.info("Create SortConfigMetricReporter:{}", type);
try {
Class<?> loaderClass = ClassUtils.getClass(type);
Object loaderObject = loaderClass.getDeclaredConstructor().newInstance();
if (loaderObject instanceof Configurable) {
((Configurable) loaderObject).configure(new Context(CommonPropertiesHolder.get()));
}
if (!(loaderObject instanceof SortConfigMetricListener)) {
log.error("Got exception when create SortConfigMetricReporter instance, config class:{}", type);
return null;
}
return (SortConfigMetricListener) loaderObject;
} catch (Exception e) {
log.info("failed to load SortConfigMetricReporter, type={}", type);
return null;
}
}

public static void reportOffline(SortConfig sortConfig) {
listeners.forEach(listener -> listener.reportOffline(sortConfig));
}

public static void reportOnline(SortConfig sortConfig) {
listeners.forEach(listener -> listener.reportOnline(sortConfig));
}

public static void reportUpdate(SortConfig sortConfig) {
listeners.forEach(listener -> listener.reportUpdate(sortConfig));
}

public static void reportParseFail(String dataflowId) {
listeners.forEach(listener -> listener.reportParseFail(dataflowId));
}

public static void reportRequestConfigFail() {
listeners.forEach(SortConfigMetricListener::reportRequestConfigFail);
}

public static void reportDecompressFail() {
listeners.forEach(SortConfigMetricListener::reportDecompressFail);
}

public static void reportCheckFail() {
listeners.forEach(SortConfigMetricListener::reportCheckFail);
}

public static void reportRequestNoUpdate() {
listeners.forEach(SortConfigMetricListener::reportRequestNoUpdate);
}

public static void reportRequestUpdate() {
listeners.forEach(SortConfigMetricListener::reportRequestUpdate);
}

public static void reportMissInSortClusterConfig(Collection<String> dataflows) {
listeners.forEach(listener -> listener.reportMissInSortClusterConfig(dataflows));
}

public static void reportMissInSortConfig(Collection<String> dataflows) {
listeners.forEach(listener -> listener.reportMissInSortConfig(dataflows));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.inlong.common.metric.MetricObserver;
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
import org.apache.inlong.sort.standalone.metrics.SortConfigMetricReporter;
import org.apache.inlong.sort.standalone.metrics.audit.AuditUtils;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;

Expand Down Expand Up @@ -47,6 +48,7 @@ public void run() {
cluster.close();
}
});
SortConfigMetricReporter.init(CommonPropertiesHolder.get());
// start the cluster
cluster.start();
// metrics
Expand Down

0 comments on commit caa99a9

Please sign in to comment.