diff --git a/coordinator/pom.xml b/coordinator/pom.xml index 0ffbc770bc..e66384e80b 100644 --- a/coordinator/pom.xml +++ b/coordinator/pom.xml @@ -36,6 +36,10 @@ org.apache.uniffle rss-server-common + + org.apache.uniffle + shuffle-storage + org.apache.uniffle rss-internal-client diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/AppInfo.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/AppInfo.java index 16b9be1cc5..61b1ce0f70 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/AppInfo.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/AppInfo.java @@ -23,6 +23,7 @@ public class AppInfo implements Comparable { private String appId; private long updateTime; private long registrationTime; + private long finishTime; private String version; private String gitCommitId; @@ -67,6 +68,14 @@ public String getGitCommitId() { return gitCommitId; } + public void setFinishTime(long finishTime) { + this.finishTime = finishTime; + } + + public long getFinishTime() { + return finishTime; + } + @Override public int compareTo(AppInfo appInfo) { return Long.compare(registrationTime, appInfo.getRegistrationTime()); diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java index afbcffd830..5f2f48b97a 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java @@ -54,6 +54,7 @@ import org.apache.uniffle.coordinator.strategy.storage.LowestIOSampleCostSelectStorageStrategy; import org.apache.uniffle.coordinator.strategy.storage.RankValue; import org.apache.uniffle.coordinator.strategy.storage.SelectStorageStrategy; +import org.apache.uniffle.coordinator.web.vo.AppInfoVO; public class ApplicationManager implements Closeable { @@ -78,6 +79,9 @@ public class ApplicationManager implements Closeable { // it's only for test case to check if status check has problem private boolean hasErrorInStatusCheck = false; + private CoordinatorServer coordinatorServer; + private CoordinatorAppHistoryManager coordinatorAppHistoryManager = null; + public ApplicationManager(CoordinatorConf conf) { storageStrategy = conf.get(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SELECT_STRATEGY); appIdToRemoteStorageInfo = JavaUtils.newConcurrentMap(); @@ -110,6 +114,9 @@ public ApplicationManager(CoordinatorConf conf) { break; } } + + coordinatorAppHistoryManager = CoordinatorAppHistoryManager.create(conf); + // the thread for checking application status checkAppScheduler = ThreadUtils.getDaemonSingleThreadScheduledExecutor("ApplicationManager"); checkAppScheduler.scheduleAtFixedRate( @@ -348,7 +355,16 @@ protected void statusCheck() { if (System.currentTimeMillis() - lastReport.getUpdateTime() > expired) { expiredAppIds.add(appId); appAndTimes.remove(appId); - appIdToUser.remove(appId); + String user = appIdToUser.remove(appId); + if (coordinatorServer != null && user != null && coordinatorAppHistoryManager != null) { + lastReport.setFinishTime(System.currentTimeMillis()); + AppInfoVO appInfoVO = coordinatorServer.getAppInfoV0(user, lastReport); + coordinatorAppHistoryManager.addAppInfo(appInfoVO); + // remove the remain appInfo in serverNode + for (ServerNode serverNode : coordinatorServer.getClusterManager().list()) { + serverNode.getAppIdToInfos().remove(appId); + } + } } } } @@ -546,6 +562,14 @@ public Map> getCurrentUserAndApp() { return currentUserAndApp; } + public List getCachedAppInfos(int currentAppSize) { + return coordinatorAppHistoryManager.getAppInfos(currentAppSize); + } + + public int getCachedAppInfosSize(int currentAppSize) { + return coordinatorAppHistoryManager.getAppInfosSize(currentAppSize); + } + public void close() { if (detectStorageScheduler != null) { detectStorageScheduler.shutdownNow(); @@ -553,6 +577,13 @@ public void close() { if (checkAppScheduler != null) { checkAppScheduler.shutdownNow(); } + if (coordinatorAppHistoryManager != null) { + coordinatorAppHistoryManager.close(); + } + } + + public void setCoordinatorServer(CoordinatorServer coordinatorServer) { + this.coordinatorServer = coordinatorServer; } public enum StrategyName { diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorAppHistoryManager.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorAppHistoryManager.java new file mode 100644 index 0000000000..1ce92f85be --- /dev/null +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorAppHistoryManager.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.coordinator; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider; +import org.apache.uniffle.coordinator.web.vo.AppInfoVO; +import org.apache.uniffle.storage.handler.impl.HadoopFileReader; +import org.apache.uniffle.storage.handler.impl.HadoopFileWriter; + +public class CoordinatorAppHistoryManager { + private static final Logger LOG = LoggerFactory.getLogger(CoordinatorAppHistoryManager.class); + private final BlockingQueue appInfoQueue; + private volatile boolean isRunning; + private final Map cachedAppInfoMap; + private final int cachedAppInfoSize; + private final int dashboardAppInfoMaxSize; + private final CoordinatorConf conf; + + private Path path = null; + private HadoopFileWriter hadoopFileWriter = null; + private int batchSize = 0; + private long flushIntervalMs = 0; + + private static final ObjectMapper objectMapper = new ObjectMapper(); + + public CoordinatorAppHistoryManager(CoordinatorConf conf) throws Exception { + this.conf = conf; + this.isRunning = true; + this.appInfoQueue = + new LinkedBlockingQueue<>( + this.conf.getInteger(CoordinatorConf.COORDINATOR_APP_HISTORY_CACHE_MAX_SIZE)); + this.cachedAppInfoSize = + this.conf.getInteger(CoordinatorConf.COORDINATOR_APP_HISTORY_CACHE_MAX_SIZE); + this.dashboardAppInfoMaxSize = this.cachedAppInfoSize; + this.cachedAppInfoMap = + new LinkedHashMap(this.cachedAppInfoSize, 0.75f, true) { + @Override + protected boolean removeEldestEntry(Map.Entry eldest) { + return size() > cachedAppInfoSize; + } + }; + String historyFile = this.conf.getString(CoordinatorConf.COORDINATOR_APP_HISTORY_PATH); + if (historyFile != null && !historyFile.isEmpty()) { + // Convert relative path to absolute path + if (historyFile.startsWith("file://./")) { + String absPath = new File(historyFile.substring(7)).getAbsolutePath(); + historyFile = "file://" + absPath; + } + this.path = new Path(historyFile); + FileSystem fileSystem = + HadoopFilesystemProvider.getFilesystem( + "rss_coordinator_app_history", path, this.conf.getHadoopConf()); + this.hadoopFileWriter = new HadoopFileWriter(fileSystem, path, this.conf.getHadoopConf()); + this.batchSize = this.conf.getInteger(CoordinatorConf.COORDINATOR_APP_HISTORY_BATCH_SIZE); + this.flushIntervalMs = + this.conf.getLong(CoordinatorConf.COORDINATOR_APP_HISTORY_FLUSH_INTERVAL_MS); + startPersistentThread(); + loadAppInfo(); + } + } + + public void startPersistentThread() { + Thread persistentThread = + new Thread( + () -> { + while (isRunning) { + try { + persistentAppInfoBatch(); + } catch (Exception e) { + LOG.error("Error in persistent thread", e); + } + } + }); + persistentThread.setName("AppHistoryPersistentThread"); + persistentThread.setDaemon(true); + persistentThread.start(); + } + + private void persistentAppInfoBatch() throws InterruptedException { + List batch = new ArrayList<>(batchSize); + long startTime = System.currentTimeMillis(); + + while (batch.size() < batchSize && (System.currentTimeMillis() - startTime) < flushIntervalMs) { + AppInfoVO appInfoVO = + appInfoQueue.poll( + flushIntervalMs - (System.currentTimeMillis() - startTime), TimeUnit.MILLISECONDS); + if (appInfoVO != null) { + batch.add(appInfoVO); + } else { + break; + } + } + + if (!batch.isEmpty()) { + persistentAppInfo(batch); + } + } + + public synchronized void addAppInfo(AppInfoVO appInfoVO) { + cachedAppInfoMap.put(appInfoVO.getAppId(), appInfoVO); + if (!appInfoQueue.offer(appInfoVO)) { + LOG.warn( + "add app info to queue, appId: {}, but queue is full, size: {}", + appInfoVO.getAppId(), + appInfoQueue.size()); + } + } + + public void loadAppInfo() { + HadoopFileReader hadoopFileReader = null; + try { + hadoopFileReader = new HadoopFileReader(path, this.conf.getHadoopConf()); + // read last N bytes, make sure to read the last cachedAppInfoSize AppInfoVOs + long readSize = cachedAppInfoSize * 1024L; + long fileSize = hadoopFileReader.getFileLen(); + if (fileSize < readSize) { + readSize = fileSize; + } + long offset = fileSize - readSize; + byte[] data = hadoopFileReader.read(offset, (int) readSize); + String dataStr = new String(data, StandardCharsets.UTF_8); + String[] lines = dataStr.split("\n"); + for (String line : lines) { + if (line.isEmpty()) { + continue; + } + try { + AppInfoVO appInfoVO = objectMapper.readValue(line, AppInfoVO.class); + cachedAppInfoMap.put(appInfoVO.getAppId(), appInfoVO); + } catch (JsonProcessingException e) { + LOG.warn("Skipping invalid JSON line: {}. Error: {}", line, e.getMessage(), e); + } + } + } catch (IllegalStateException e) { + LOG.error( + "load app info from {} failed, got IllegalStateException: {}", path, e.getMessage()); + } catch (Exception e) { + LOG.error("load app info from {} failed, got Exception: {}", path, e.getMessage()); + } finally { + if (hadoopFileReader != null) { + try { + hadoopFileReader.close(); + } catch (IOException e) { + LOG.error("close hadoopFileReader from {} failed, Exception: {}", path, e.getMessage()); + } + } + } + } + + public void persistentAppInfo(List appInfos) { + try { + for (AppInfoVO appInfoVO : appInfos) { + String jsonString = objectMapper.writeValueAsString(appInfoVO); + hadoopFileWriter.writeData(jsonString.getBytes(StandardCharsets.UTF_8)); + hadoopFileWriter.writeData( + "\n" + .getBytes( + StandardCharsets.UTF_8)); // Add a newline character to separate JSON objects + } + hadoopFileWriter.flush(); + } catch (IOException e) { + LOG.error("write data failed, Exception: {}", e.getMessage()); + } + } + + public void close() { + isRunning = false; + try { + hadoopFileWriter.close(); + } catch (Exception e) { + LOG.error("close failed, Exception: {}", e.getMessage()); + } + } + + public List getAppInfos(int currentAppSize) { + // get the last size AppInfoVOs + int needSize = Math.max(0, dashboardAppInfoMaxSize - currentAppSize); + if (cachedAppInfoMap.size() < needSize) { + return new ArrayList<>(cachedAppInfoMap.values()); + } + return new ArrayList<>(cachedAppInfoMap.values()) + .subList(cachedAppInfoMap.size() - needSize, cachedAppInfoMap.size()); + } + + public int getAppInfosSize(int currentAppSize) { + int needSize = Math.max(0, dashboardAppInfoMaxSize - currentAppSize); + return Math.min(cachedAppInfoMap.size(), needSize); + } + + public static CoordinatorAppHistoryManager create(CoordinatorConf conf) { + if (conf.getBoolean(CoordinatorConf.COORDINATOR_APP_HISTORY_ENABLE)) { + try { + return new CoordinatorAppHistoryManager(conf); + } catch (Exception e) { + LOG.error("create app history manager failed, Exception: {}", e.getMessage()); + } + } + return null; + } +} diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java index 5b1f64ff16..4086a02db3 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java @@ -19,6 +19,8 @@ import java.util.List; +import org.apache.hadoop.conf.Configuration; + import org.apache.uniffle.common.config.ConfigOption; import org.apache.uniffle.common.config.ConfigOptions; import org.apache.uniffle.common.config.ConfigUtils; @@ -36,6 +38,8 @@ */ public class CoordinatorConf extends RssBaseConf { + public static final String PREFIX_HADOOP_CONF = "rss.coordinator.hadoop"; + public static final ConfigOption COORDINATOR_CLIENT_CONF_APPLY_STRATEGY = ConfigOptions.key("rss.coordinator.client.confApplyStrategy") .stringType() @@ -257,6 +261,34 @@ public class CoordinatorConf extends RssBaseConf { .defaultValues("appHeartbeat", "heartbeat") .withDescription("Exclude record rpc audit operation list, separated by ','"); + public static final ConfigOption COORDINATOR_APP_HISTORY_ENABLE = + ConfigOptions.key("rss.coordinator.app.history.enable") + .booleanType() + .defaultValue(true) + .withDescription("Enable app history."); + + public static final ConfigOption COORDINATOR_APP_HISTORY_CACHE_MAX_SIZE = + ConfigOptions.key("rss.coordinator.app.history.cache.max.size") + .intType() + .defaultValue(1000) + .withDescription("The max size for storing app history info in cache."); + + public static final ConfigOption COORDINATOR_APP_HISTORY_PATH = + ConfigOptions.key("rss.coordinator.app.history.persist.path") + .stringType() + .defaultValue("file://./logs/application_history.txt") + .withDescription("The path for storing app history info."); + public static final ConfigOption COORDINATOR_APP_HISTORY_BATCH_SIZE = + ConfigOptions.key("rss.coordinator.app.history.flush.batch.size") + .intType() + .defaultValue(100) + .withDescription("The batch size for flush app history info."); + public static final ConfigOption COORDINATOR_APP_HISTORY_FLUSH_INTERVAL_MS = + ConfigOptions.key("rss.coordinator.app.history.flush.interval.ms") + .longType() + .defaultValue(5000L) + .withDescription("The flush interval for storing app history info."); + public CoordinatorConf() {} public CoordinatorConf(String fileName) { @@ -267,6 +299,18 @@ public CoordinatorConf(String fileName) { } } + public Configuration getHadoopConf() { + Configuration hadoopConf = new Configuration(); + for (String key : getKeySet()) { + if (key.startsWith(CoordinatorConf.PREFIX_HADOOP_CONF)) { + String value = getString(key, ""); + String hadoopKey = key.substring(CoordinatorConf.PREFIX_HADOOP_CONF.length() + 1); + hadoopConf.set(hadoopKey, value); + } + } + return hadoopConf; + } + public boolean loadConfFromFile(String fileName) { return loadConfFromFile(fileName, ConfigUtils.getAllConfigOptions(CoordinatorConf.class)); } diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java index d618211e0e..7b5d24bfa2 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java @@ -219,6 +219,17 @@ public void heartbeat( try (CoordinatorRPCAuditContext auditContext = createAuditContext("heartbeat")) { final ServerNode serverNode = toServerNode(request); auditContext.withArgs("serverNode=" + serverNode.getId()); + ServerNode oldServerNode = null; + try { + oldServerNode = coordinatorServer.getClusterManager().getServerNodeById(serverNode.getId()); + } catch (Exception ignored) { + // Ignore this exception. + // Because when receiving the heartbeat for the fist time, there will be no + // server node information and an exception will be thrown. + } + // combine appInfo list from oldServerNode to serverNode + serverNode.combineAppInfos(oldServerNode); + coordinatorServer.getClusterManager().add(serverNode); final ShuffleServerHeartBeatResponse response = ShuffleServerHeartBeatResponse.newBuilder() @@ -519,7 +530,8 @@ private ServerNode toServerNode(ShuffleServerHeartBeatRequest request) { request.getServerId().getJettyPort(), request.getStartTimeMs(), request.getVersion(), - request.getGitCommitId()); + request.getGitCommitId(), + request.getAppInfosList()); } /** diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java index 847afda837..3bf63dfb92 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java @@ -17,6 +17,7 @@ package org.apache.uniffle.coordinator; +import java.util.Map; import java.util.function.Consumer; import io.prometheus.client.CollectorRegistry; @@ -48,6 +49,8 @@ import org.apache.uniffle.coordinator.strategy.assignment.AssignmentStrategy; import org.apache.uniffle.coordinator.strategy.assignment.AssignmentStrategyFactory; import org.apache.uniffle.coordinator.util.CoordinatorUtils; +import org.apache.uniffle.coordinator.web.vo.AppInfoVO; +import org.apache.uniffle.proto.RssProtos; import static org.apache.uniffle.common.config.RssBaseConf.RSS_SECURITY_HADOOP_KERBEROS_ENABLE; import static org.apache.uniffle.common.config.RssBaseConf.RSS_SECURITY_HADOOP_KERBEROS_KEYTAB_FILE; @@ -183,6 +186,7 @@ private void initialization() throws Exception { new ClusterManagerFactory(coordinatorConf, hadoopConf); this.clusterManager = clusterManagerFactory.getClusterManager(); + this.applicationManager.setCoordinatorServer(this); DynamicClientConfService dynamicClientConfService = new DynamicClientConfService( @@ -280,4 +284,37 @@ protected void blockUntilShutdown() throws InterruptedException { public long getStartTimeMs() { return startTimeMs; } + + public AppInfoVO getAppInfoV0(String user, AppInfo appInfo) { + AppInfoVO appInfoVO = + new AppInfoVO( + user, + appInfo.getAppId(), + appInfo.getUpdateTime(), + appInfo.getRegistrationTime(), + appInfo.getFinishTime(), + appInfo.getVersion(), + appInfo.getGitCommitId(), + 0, + 0, + 0, + 0, + 0, + 0, + 0); + for (ServerNode server : clusterManager.list()) { + Map appIdToInfos = server.getAppIdToInfos(); + if (appIdToInfos.containsKey(appInfoVO.getAppId())) { + RssProtos.ApplicationInfo app = appIdToInfos.get(appInfoVO.getAppId()); + appInfoVO.setPartitionNum(appInfoVO.getPartitionNum() + app.getPartitionNum()); + appInfoVO.setMemorySize(appInfoVO.getMemorySize() + app.getMemorySize()); + appInfoVO.setLocalFileNum(appInfoVO.getLocalFileNum() + app.getLocalFileNum()); + appInfoVO.setLocalTotalSize(appInfoVO.getLocalTotalSize() + app.getLocalTotalSize()); + appInfoVO.setHadoopFileNum(appInfoVO.getHadoopFileNum() + app.getHadoopFileNum()); + appInfoVO.setHadoopTotalSize(appInfoVO.getHadoopTotalSize() + app.getHadoopTotalSize()); + appInfoVO.setTotalSize(appInfoVO.getTotalSize() + app.getTotalSize()); + } + } + return appInfoVO; + } } diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/ServerNode.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/ServerNode.java index ad992f0bc3..de3e441da6 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/ServerNode.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/ServerNode.java @@ -17,35 +17,52 @@ package org.apache.uniffle.coordinator; +import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.apache.uniffle.common.ServerStatus; import org.apache.uniffle.common.storage.StorageInfo; +import org.apache.uniffle.coordinator.web.vo.ServerNodeVO; +import org.apache.uniffle.proto.RssProtos; import org.apache.uniffle.proto.RssProtos.ShuffleServerId; public class ServerNode implements Comparable { + private final ServerNodeVO serverNodeVO; + private final Map appIdToInfos; - private String id; - private String ip; - private int grpcPort; - private long usedMemory; - private long preAllocatedMemory; - private long availableMemory; - private int eventNumInFlush; - private long registrationTime; - private long timestamp; - private Set tags; - private ServerStatus status; - private Map storageInfo; - private int nettyPort = -1; - private int jettyPort = -1; - private long startTime = -1; - private String version; - private String gitCommitId; + public Map getAppIdToInfos() { + return appIdToInfos; + } + + public void combineAppInfos(ServerNode oldServerNode) { + if (oldServerNode == null) { + return; + } + for (Map.Entry entry : + oldServerNode.getAppIdToInfos().entrySet()) { + if (entry.getValue() == null) { + continue; + } + if (!appIdToInfos.containsKey(entry.getKey())) { + appIdToInfos.put(entry.getKey(), entry.getValue()); + } else { + RssProtos.ApplicationInfo current = appIdToInfos.get(entry.getKey()); + RssProtos.ApplicationInfo other = entry.getValue(); + // keep the max info + if (current.getTotalSize() > other.getTotalSize()) { + appIdToInfos.put(entry.getKey(), current); + } else { + appIdToInfos.put(entry.getKey(), other); + } + } + } + } public ServerNode(String id) { this(id, "", 0, 0, 0, 0, 0, Sets.newHashSet(), ServerStatus.EXCLUDED); @@ -181,7 +198,8 @@ public ServerNode( jettyPort, startTime, "", - ""); + "", + Collections.emptyList()); } public ServerNode( @@ -199,141 +217,142 @@ public ServerNode( int jettyPort, long startTime, String version, - String gitCommitId) { - this.id = id; - this.ip = ip; - this.grpcPort = grpcPort; - this.usedMemory = usedMemory; - this.preAllocatedMemory = preAllocatedMemory; - this.availableMemory = availableMemory; - this.eventNumInFlush = eventNumInFlush; - this.registrationTime = System.currentTimeMillis(); - this.timestamp = registrationTime; - this.tags = tags; - this.status = status; - this.storageInfo = storageInfoMap; - if (nettyPort > 0) { - this.nettyPort = nettyPort; - } - if (jettyPort > 0) { - this.jettyPort = jettyPort; + String gitCommitId, + List appInfos) { + this.serverNodeVO = + new ServerNodeVO( + id, + ip, + grpcPort, + usedMemory, + preAllocatedMemory, + availableMemory, + eventNumInFlush, + tags, + status, + storageInfoMap, + nettyPort, + jettyPort, + startTime, + version, + gitCommitId); + this.appIdToInfos = new ConcurrentHashMap<>(); + for (RssProtos.ApplicationInfo app : appInfos) { + this.appIdToInfos.put(app.getAppId(), app); } - this.startTime = startTime; - this.version = version; - this.gitCommitId = gitCommitId; } public ShuffleServerId convertToGrpcProto() { return ShuffleServerId.newBuilder() - .setId(id) - .setIp(ip) - .setPort(grpcPort) - .setNettyPort(nettyPort) - .setJettyPort(jettyPort) + .setId(serverNodeVO.getId()) + .setIp(serverNodeVO.getIp()) + .setPort(serverNodeVO.getGrpcPort()) + .setNettyPort(serverNodeVO.getNettyPort()) + .setJettyPort(serverNodeVO.getJettyPort()) .build(); } public String getId() { - return id; + return serverNodeVO.getId(); } public String getIp() { - return ip; + return serverNodeVO.getIp(); } public int getGrpcPort() { - return grpcPort; + return serverNodeVO.getGrpcPort(); } public long getTimestamp() { - return timestamp; + return serverNodeVO.getTimestamp(); } public long getPreAllocatedMemory() { - return preAllocatedMemory; + return serverNodeVO.getPreAllocatedMemory(); } public long getAvailableMemory() { - return availableMemory; + return serverNodeVO.getAvailableMemory(); } public int getEventNumInFlush() { - return eventNumInFlush; + return serverNodeVO.getEventNumInFlush(); } public long getUsedMemory() { - return usedMemory; + return serverNodeVO.getUsedMemory(); } public Set getTags() { - return tags; + return serverNodeVO.getTags(); } public ServerStatus getStatus() { - return status; + return serverNodeVO.getStatus(); } public void setStatus(ServerStatus serverStatus) { - this.status = serverStatus; + serverNodeVO.setStatus(serverStatus); } public Map getStorageInfo() { - return storageInfo; + return serverNodeVO.getStorageInfo(); } @Override public String toString() { return "ServerNode with id[" - + id + + serverNodeVO.getId() + "], ip[" - + ip + + serverNodeVO.getIp() + "], grpc port[" - + grpcPort + + serverNodeVO.getGrpcPort() + "], netty port[" - + nettyPort + + serverNodeVO.getNettyPort() + "], jettyPort[" - + jettyPort + + serverNodeVO.getJettyPort() + "], usedMemory[" - + usedMemory + + serverNodeVO.getUsedMemory() + "], preAllocatedMemory[" - + preAllocatedMemory + + serverNodeVO.getPreAllocatedMemory() + "], availableMemory[" - + availableMemory + + serverNodeVO.getAvailableMemory() + "], eventNumInFlush[" - + eventNumInFlush + + serverNodeVO.getEventNumInFlush() + "], timestamp[" - + timestamp + + serverNodeVO.getTimestamp() + "], tags[" - + tags.toString() + + serverNodeVO.getTags().toString() + "], status[" - + status + + serverNodeVO.getStatus() + "], storages[num=" - + storageInfo.size() + + serverNodeVO.getStorageInfo().size() + "], version[" - + version + + serverNodeVO.getVersion() + "], gitCommitId[" - + gitCommitId + + serverNodeVO.getGitCommitId() + "]"; } /** Only for test case */ public void setTimestamp(long timestamp) { - this.timestamp = timestamp; + serverNodeVO.setTimestamp(timestamp); } void setRegistrationTime(long registrationTime) { - this.registrationTime = registrationTime; + serverNodeVO.setRegistrationTime(registrationTime); } public long getRegistrationTime() { - return registrationTime; + return serverNodeVO.getRegistrationTime(); } @Override public int compareTo(ServerNode other) { - if (availableMemory > other.getAvailableMemory()) { + if (getAvailableMemory() > other.getAvailableMemory()) { return -1; - } else if (availableMemory < other.getAvailableMemory()) { + } else if (getAvailableMemory() < other.getAvailableMemory()) { return 1; } return 0; @@ -341,38 +360,42 @@ public int compareTo(ServerNode other) { @Override public int hashCode() { - return id.hashCode(); + return serverNodeVO.getId().hashCode(); } @Override public boolean equals(Object obj) { if (obj instanceof ServerNode) { - return id.equals(((ServerNode) obj).getId()); + return serverNodeVO.getId().equals(((ServerNode) obj).getId()); } return false; } public long getTotalMemory() { - return availableMemory + usedMemory; + return serverNodeVO.getAvailableMemory() + serverNodeVO.getUsedMemory(); } public int getNettyPort() { - return nettyPort; + return serverNodeVO.getNettyPort(); } public int getJettyPort() { - return jettyPort; + return serverNodeVO.getJettyPort(); } public long getStartTime() { - return startTime; + return serverNodeVO.getStartTime(); } public String getVersion() { - return version; + return serverNodeVO.getVersion(); } public String getGitCommitId() { - return gitCommitId; + return serverNodeVO.getGitCommitId(); + } + + public ServerNodeVO getServerNodeVO() { + return serverNodeVO; } } diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java index 1bd73dbe40..d9ece4dc08 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java @@ -371,9 +371,6 @@ public int getNodesNum() { @Override public List list() { - for (Map.Entry entry : servers.entrySet()) { - ServerNode server = entry.getValue(); - } return Lists.newArrayList(servers.values()); } diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/lombok.config b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/lombok.config index d5b9792240..0268471a44 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/lombok.config +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/lombok.config @@ -19,3 +19,5 @@ clear lombok.anyConstructor.flagUsage clear lombok.noArgsConstructor.flagUsage clear lombok.data.flagUsage clear lombok.builder.flagUsage +clear lombok.setter.flagUsage +clear lombok.getter.flagUsage diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/ApplicationResource.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/ApplicationResource.java index eaa7695d36..9f6747ee29 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/ApplicationResource.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/ApplicationResource.java @@ -36,6 +36,7 @@ import org.apache.uniffle.common.web.resource.Response; import org.apache.uniffle.coordinator.AppInfo; import org.apache.uniffle.coordinator.ApplicationManager; +import org.apache.uniffle.coordinator.CoordinatorServer; import org.apache.uniffle.coordinator.metric.CoordinatorMetrics; import org.apache.uniffle.coordinator.web.vo.AppInfoVO; import org.apache.uniffle.coordinator.web.vo.UserAppNumVO; @@ -65,10 +66,24 @@ public Response> getUserApps() { Map> currentUserAndApp = getApplicationManager().getCurrentUserAndApp(); List usercnt = new ArrayList<>(); + Map userAppNumMap = Maps.newHashMap(); + int currentUserAppNum = 0; for (Map.Entry> stringMapEntry : currentUserAndApp.entrySet()) { - String userName = stringMapEntry.getKey(); - usercnt.add(new UserAppNumVO(userName, stringMapEntry.getValue().size())); + currentUserAppNum += stringMapEntry.getValue().size(); + userAppNumMap.put(stringMapEntry.getKey(), stringMapEntry.getValue().size()); + } + getApplicationManager() + .getCachedAppInfos(currentUserAppNum) + .forEach( + appInfoVO -> { + userAppNumMap.computeIfAbsent(appInfoVO.getUserName(), k -> 0); + userAppNumMap.put( + appInfoVO.getUserName(), userAppNumMap.get(appInfoVO.getUserName()) + 1); + }); + for (Map.Entry stringIntegerEntry : userAppNumMap.entrySet()) { + String userName = stringIntegerEntry.getKey(); + usercnt.add(new UserAppNumVO(userName, stringIntegerEntry.getValue())); } // Display inverted by the number of user applications. usercnt.sort(Comparator.reverseOrder()); @@ -89,22 +104,23 @@ public Response> getAppInfoList() { for (Map.Entry appIdTimestampMap : userAppIdTimestampMap.getValue().entrySet()) { AppInfo appInfo = appIdTimestampMap.getValue(); - userToAppList.add( - new AppInfoVO( - userAppIdTimestampMap.getKey(), - appInfo.getAppId(), - appInfo.getUpdateTime(), - appInfo.getRegistrationTime(), - appInfo.getVersion(), - appInfo.getGitCommitId())); + AppInfoVO appInfoVO = + getCoordinatorServer().getAppInfoV0(userAppIdTimestampMap.getKey(), appInfo); + userToAppList.add(appInfoVO); } } + userToAppList.addAll(getApplicationManager().getCachedAppInfos(userToAppList.size())); // Display is inverted by the submission time of the application. userToAppList.sort(Comparator.reverseOrder()); return userToAppList; }); } + private CoordinatorServer getCoordinatorServer() { + return (CoordinatorServer) + servletContext.getAttribute(CoordinatorServer.class.getCanonicalName()); + } + private ApplicationManager getApplicationManager() { return (ApplicationManager) servletContext.getAttribute(ApplicationManager.class.getCanonicalName()); diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/ServerResource.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/ServerResource.java index ade6731124..58814f7bcb 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/ServerResource.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/ServerResource.java @@ -51,6 +51,7 @@ import org.apache.uniffle.coordinator.web.request.ApplicationRequest; import org.apache.uniffle.coordinator.web.request.CancelDecommissionRequest; import org.apache.uniffle.coordinator.web.request.DecommissionRequest; +import org.apache.uniffle.coordinator.web.vo.ServerNodeVO; @Produces({MediaType.APPLICATION_JSON}) public class ServerResource extends BaseResource { @@ -70,23 +71,30 @@ public Response node(@PathParam("id") String id) { @GET @Path("/nodes") - public Response> nodes(@QueryParam("status") String status) { + public Response> nodes(@QueryParam("status") String status) { ClusterManager clusterManager = getClusterManager(); - List serverList; + List serverList; if (ServerStatus.UNHEALTHY.name().equalsIgnoreCase(status)) { - serverList = clusterManager.getUnhealthyServerList(); + serverList = + clusterManager.getUnhealthyServerList().stream() + .map(node -> node.getServerNodeVO()) + .collect(Collectors.toList()); } else if (ServerStatus.LOST.name().equalsIgnoreCase(status)) { - serverList = clusterManager.getLostServerList(); + serverList = + clusterManager.getLostServerList().stream() + .map(node -> node.getServerNodeVO()) + .collect(Collectors.toList()); } else if (ServerStatus.EXCLUDED.name().equalsIgnoreCase(status)) { serverList = clusterManager.getExcludedNodes().stream() - .map(ServerNode::new) + .map(ServerNodeVO::new) .collect(Collectors.toList()); } else { List serverAllList = clusterManager.list(); serverList = serverAllList.stream() .filter(node -> !clusterManager.getExcludedNodes().contains(node.getId())) + .map(node -> node.getServerNodeVO()) .collect(Collectors.toList()); } serverList = @@ -96,7 +104,7 @@ public Response> nodes(@QueryParam("status") String status) { return status == null || server.getStatus().name().equalsIgnoreCase(status); }) .collect(Collectors.toList()); - serverList.sort(Comparator.comparing(ServerNode::getId)); + serverList.sort(Comparator.comparing(ServerNodeVO::getId)); return Response.success(serverList); } diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/vo/AppInfoVO.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/vo/AppInfoVO.java index 2d6c8e1d0f..922cebb61f 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/vo/AppInfoVO.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/vo/AppInfoVO.java @@ -17,23 +17,60 @@ package org.apache.uniffle.coordinator.web.vo; +import java.util.Objects; + import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; +import lombok.Getter; +import lombok.Setter; @Data @Builder @AllArgsConstructor +@Getter +@Setter public class AppInfoVO implements Comparable { private String userName; private String appId; private long updateTime; private long registrationTime; + private long finishTime; private String version; private String gitCommitId; + private long partitionNum; + private long memorySize; + private long localFileNum; + private long localTotalSize; + private long hadoopFileNum; + private long hadoopTotalSize; + private long totalSize; + + public AppInfoVO() {} + @Override public int compareTo(AppInfoVO appInfoVO) { return Long.compare(registrationTime, appInfoVO.getRegistrationTime()); } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof AppInfoVO)) { + return false; + } + AppInfoVO appInfoVO = (AppInfoVO) o; + return updateTime == appInfoVO.updateTime + && registrationTime == appInfoVO.registrationTime + && userName.equals(appInfoVO.userName) + && appId.equals(appInfoVO.appId); + } + + @Override + public int hashCode() { + return Objects.hash(userName, appId, updateTime, registrationTime); + } } diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/vo/ServerNodeVO.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/vo/ServerNodeVO.java new file mode 100644 index 0000000000..e1aafd95d9 --- /dev/null +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/vo/ServerNodeVO.java @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.coordinator.web.vo; + +import java.util.Map; +import java.util.Set; + +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import lombok.Data; +import lombok.Getter; +import lombok.Setter; + +import org.apache.uniffle.common.ServerStatus; +import org.apache.uniffle.common.storage.StorageInfo; + +@Data +@Getter +@Setter +public class ServerNodeVO implements Comparable { + + private String id; + private String ip; + private int grpcPort; + private long usedMemory; + private long preAllocatedMemory; + private long availableMemory; + private int eventNumInFlush; + private long registrationTime; + private long timestamp; + private Set tags; + private ServerStatus status; + private Map storageInfo; + private int nettyPort = -1; + private int jettyPort = -1; + private long startTime = -1; + private String version; + private String gitCommitId; + + public ServerNodeVO(String id) { + this(id, "", 0, 0, 0, 0, 0, Sets.newHashSet(), ServerStatus.EXCLUDED); + } + + // Only for test + public ServerNodeVO( + String id, + String ip, + int port, + long usedMemory, + long preAllocatedMemory, + long availableMemory, + int eventNumInFlush, + Set tags) { + this( + id, + ip, + port, + usedMemory, + preAllocatedMemory, + availableMemory, + eventNumInFlush, + tags, + ServerStatus.ACTIVE, + Maps.newHashMap()); + } + + public ServerNodeVO( + String id, + String ip, + int port, + long usedMemory, + long preAllocatedMemory, + long availableMemory, + int eventNumInFlush, + Set tags, + ServerStatus status) { + this( + id, + ip, + port, + usedMemory, + preAllocatedMemory, + availableMemory, + eventNumInFlush, + tags, + status, + Maps.newHashMap()); + } + + public ServerNodeVO( + String id, + String ip, + int port, + long usedMemory, + long preAllocatedMemory, + long availableMemory, + int eventNumInFlush, + Set tags, + ServerStatus status, + Map storageInfoMap) { + this( + id, + ip, + port, + usedMemory, + preAllocatedMemory, + availableMemory, + eventNumInFlush, + tags, + status, + storageInfoMap, + -1, + -1, + -1); + } + + public ServerNodeVO( + String id, + String ip, + int grpcPort, + long usedMemory, + long preAllocatedMemory, + long availableMemory, + int eventNumInFlush, + Set tags, + ServerStatus status, + Map storageInfoMap, + int nettyPort) { + this( + id, + ip, + grpcPort, + usedMemory, + preAllocatedMemory, + availableMemory, + eventNumInFlush, + tags, + status, + storageInfoMap, + nettyPort, + -1, + -1L); + } + + public ServerNodeVO( + String id, + String ip, + int grpcPort, + long usedMemory, + long preAllocatedMemory, + long availableMemory, + int eventNumInFlush, + Set tags, + ServerStatus status, + Map storageInfoMap, + int nettyPort, + int jettyPort, + long startTime) { + this( + id, + ip, + grpcPort, + usedMemory, + preAllocatedMemory, + availableMemory, + eventNumInFlush, + tags, + status, + storageInfoMap, + nettyPort, + -1, + -1L, + "", + ""); + } + + public ServerNodeVO( + String id, + String ip, + int grpcPort, + long usedMemory, + long preAllocatedMemory, + long availableMemory, + int eventNumInFlush, + Set tags, + ServerStatus status, + Map storageInfoMap, + int nettyPort, + int jettyPort, + long startTime, + String version, + String gitCommitId) { + this.id = id; + this.ip = ip; + this.grpcPort = grpcPort; + this.usedMemory = usedMemory; + this.preAllocatedMemory = preAllocatedMemory; + this.availableMemory = availableMemory; + this.eventNumInFlush = eventNumInFlush; + this.registrationTime = System.currentTimeMillis(); + this.timestamp = registrationTime; + this.tags = tags; + this.status = status; + this.storageInfo = storageInfoMap; + if (nettyPort > 0) { + this.nettyPort = nettyPort; + } + if (jettyPort > 0) { + this.jettyPort = jettyPort; + } + this.startTime = startTime; + this.version = version; + this.gitCommitId = gitCommitId; + } + + @Override + public String toString() { + return "ServerNode with id[" + + id + + "], ip[" + + ip + + "], grpc port[" + + grpcPort + + "], netty port[" + + nettyPort + + "], jettyPort[" + + jettyPort + + "], usedMemory[" + + usedMemory + + "], preAllocatedMemory[" + + preAllocatedMemory + + "], availableMemory[" + + availableMemory + + "], eventNumInFlush[" + + eventNumInFlush + + "], timestamp[" + + timestamp + + "], tags[" + + tags.toString() + + "], status[" + + status + + "], storages[num=" + + storageInfo.size() + + "], version[" + + version + + "], gitCommitId[" + + gitCommitId + + "]"; + } + + @Override + public int compareTo(ServerNodeVO other) { + if (availableMemory > other.getAvailableMemory()) { + return -1; + } else if (availableMemory < other.getAvailableMemory()) { + return 1; + } + return 0; + } + + @Override + public int hashCode() { + return id.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof ServerNodeVO) { + return id.equals(((ServerNodeVO) obj).getId()); + } + return false; + } +} diff --git a/dashboard/src/main/webapp/src/pages/ApplicationPage.vue b/dashboard/src/main/webapp/src/pages/ApplicationPage.vue index 9290c5111a..aaaa64d446 100644 --- a/dashboard/src/main/webapp/src/pages/ApplicationPage.vue +++ b/dashboard/src/main/webapp/src/pages/ApplicationPage.vue @@ -80,14 +80,49 @@ sortable /> + + + + {{ row.version }}_{{ row.gitCommitId }} + + + + + + + + + {{ row.localFileNum }}({{ + memFormatter(null, null, row.localTotalSize) + }}) + + + + + + + {{ row.hadoopFileNum }}({{ + memFormatter(null, null, row.hadoopTotalSize) + }}) + + + @@ -97,7 +132,7 @@