Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#2211] feat(dashboard) Display the write information of application in dashboard #2214

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,8 @@ private ServerNode toServerNode(ShuffleServerHeartBeatRequest request) {
request.getServerId().getJettyPort(),
request.getStartTimeMs(),
request.getVersion(),
request.getGitCommitId());
request.getGitCommitId(),
request.getApplicationInfoList());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,18 @@

package org.apache.uniffle.coordinator;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import com.google.common.collect.Maps;
import com.google.common.collect.Sets;

import org.apache.uniffle.common.ServerStatus;
import org.apache.uniffle.common.storage.StorageInfo;
import org.apache.uniffle.proto.RssProtos;
import org.apache.uniffle.proto.RssProtos.ShuffleServerId;

public class ServerNode implements Comparable<ServerNode> {
Expand All @@ -46,6 +50,7 @@ public class ServerNode implements Comparable<ServerNode> {
private long startTime = -1;
private String version;
private String gitCommitId;
Map<String, RssProtos.ApplicationInfo> appIdToInfos;

public ServerNode(String id) {
this(id, "", 0, 0, 0, 0, 0, Sets.newHashSet(), ServerStatus.EXCLUDED);
Expand Down Expand Up @@ -181,7 +186,8 @@ public ServerNode(
jettyPort,
startTime,
"",
"");
"",
Collections.EMPTY_LIST);
}

public ServerNode(
Expand All @@ -199,7 +205,8 @@ public ServerNode(
int jettyPort,
long startTime,
String version,
String gitCommitId) {
String gitCommitId,
List<RssProtos.ApplicationInfo> appInfos) {
this.id = id;
this.ip = ip;
this.grpcPort = grpcPort;
Expand All @@ -221,6 +228,8 @@ public ServerNode(
this.startTime = startTime;
this.version = version;
this.gitCommitId = gitCommitId;
this.appIdToInfos = new ConcurrentHashMap<>();
appInfos.forEach(appInfo -> appIdToInfos.put(appInfo.getAppId(), appInfo));
}

public ShuffleServerId convertToGrpcProto() {
Expand Down Expand Up @@ -375,4 +384,8 @@ public String getVersion() {
public String getGitCommitId() {
return gitCommitId;
}

public Map<String, RssProtos.ApplicationInfo> getAppIdToInfos() {
return appIdToInfos;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,12 @@
import org.apache.uniffle.common.web.resource.Response;
import org.apache.uniffle.coordinator.AppInfo;
import org.apache.uniffle.coordinator.ApplicationManager;
import org.apache.uniffle.coordinator.CoordinatorServer;
import org.apache.uniffle.coordinator.ServerNode;
import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;
import org.apache.uniffle.coordinator.web.vo.AppInfoVO;
import org.apache.uniffle.coordinator.web.vo.UserAppNumVO;
import org.apache.uniffle.proto.RssProtos;

@Produces({MediaType.APPLICATION_JSON})
public class ApplicationResource extends BaseResource {
Expand Down Expand Up @@ -84,19 +87,44 @@ public Response<List<AppInfoVO>> getAppInfoList() {
List<AppInfoVO> userToAppList = new ArrayList<>();
Map<String, Map<String, AppInfo>> currentUserAndApp =
getApplicationManager().getCurrentUserAndApp();
List<ServerNode> serverNodes = getCoordinatorServer().getClusterManager().list();
for (Map.Entry<String, Map<String, AppInfo>> userAppIdTimestampMap :
currentUserAndApp.entrySet()) {
for (Map.Entry<String, AppInfo> appIdTimestampMap :
userAppIdTimestampMap.getValue().entrySet()) {
String user = appIdTimestampMap.getKey();
AppInfo appInfo = appIdTimestampMap.getValue();
userToAppList.add(
AppInfoVO appInfoVO =
new AppInfoVO(
userAppIdTimestampMap.getKey(),
user,
appInfo.getAppId(),
appInfo.getUpdateTime(),
appInfo.getRegistrationTime(),
appInfo.getVersion(),
appInfo.getGitCommitId()));
appInfo.getGitCommitId(),
0,
0,
0,
0,
0,
0,
0);
for (ServerNode server : serverNodes) {
Map<String, RssProtos.ApplicationInfo> appIdToInfos = server.getAppIdToInfos();
if (appIdToInfos.containsKey(appInfoVO.getAppId())) {
RssProtos.ApplicationInfo app = appIdToInfos.get(appInfoVO.getAppId());
appInfoVO.setPartitionNum(appInfoVO.getPartitionNum() + app.getPartitionNum());
appInfoVO.setMemorySize(appInfoVO.getMemorySize() + app.getMemorySize());
appInfoVO.setLocalFileNum(appInfoVO.getLocalFileNum() + app.getLocalFileNum());
appInfoVO.setLocalTotalSize(
appInfoVO.getLocalTotalSize() + app.getLocalTotalSize());
appInfoVO.setHadoopFileNum(appInfoVO.getHadoopFileNum() + app.getHadoopFileNum());
appInfoVO.setHadoopTotalSize(
appInfoVO.getHadoopTotalSize() + app.getHadoopTotalSize());
appInfoVO.setTotalSize(appInfoVO.getTotalSize() + app.getTotalSize());
}
}
userToAppList.add(appInfoVO);
}
}
// Display is inverted by the submission time of the application.
Expand All @@ -109,4 +137,9 @@ private ApplicationManager getApplicationManager() {
return (ApplicationManager)
servletContext.getAttribute(ApplicationManager.class.getCanonicalName());
}

private CoordinatorServer getCoordinatorServer() {
return (CoordinatorServer)
servletContext.getAttribute(CoordinatorServer.class.getCanonicalName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ public class AppInfoVO implements Comparable<AppInfoVO> {
private long registrationTime;
private String version;
private String gitCommitId;
private long partitionNum;
private long memorySize;
private long localFileNum;
private long localTotalSize;
private long hadoopFileNum;
private long hadoopTotalSize;
private long totalSize;

@Override
public int compareTo(AppInfoVO appInfoVO) {
Expand Down
36 changes: 34 additions & 2 deletions dashboard/src/main/webapp/src/pages/ApplicationPage.vue
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,37 @@
label="GitCommitId"
min-width="180"
/>
<el-table-column prop="partitionNum" label="PartitionNum" min-width="180" />
<el-table-column
prop="memorySize"
label="MemorySize"
min-width="180"
:formatter="memFormatter"
/>
<el-table-column label="LocalFile" min-width="180">
<template v-slot="{ row }">
<div class="mb-4">
{{ row.localFileNum }}({{
memFormatter(null, null, row.localTotalSize)
}})
</div>
</template>
</el-table-column>
<el-table-column label="HadoopFile" min-width="180">
<template v-slot="{ row }">
<div class="mb-4">
{{ row.hadoopFileNum }}({{
memFormatter(null, null, row.hadoopTotalSize)
}})
</div>
</template>
</el-table-column>
<el-table-column
prop="totalSize"
label="TotalSize"
min-width="180"
:formatter="memFormatter"
/>
</el-table>
</div>
</div>
Expand All @@ -97,7 +128,7 @@
<script>
import { getApplicationInfoList, getAppTotal, getTotalForUser } from '@/api/api'
import { onMounted, reactive } from 'vue'
import { dateFormatter } from '@/utils/common'
import { dateFormatter, memFormatter } from '@/utils/common'
import { useCurrentServerStore } from '@/store/useCurrentServerStore'

export default {
Expand Down Expand Up @@ -164,7 +195,8 @@ export default {
sortAppCollectChangeEvent,
sortApp,
sortAppChangeEvent,
dateFormatter
dateFormatter,
memFormatter
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ public ShuffleServerHeartBeatResponse doSendHeartBeat(
Map<String, StorageInfo> storageInfo,
int nettyPort,
int jettyPort,
long startTimeMs) {
long startTimeMs,
List<RssProtos.ApplicationInfo> appInfos) {
ShuffleServerId serverId =
ShuffleServerId.newBuilder()
.setId(id)
Expand All @@ -149,6 +150,7 @@ public ShuffleServerHeartBeatResponse doSendHeartBeat(
.setStartTimeMs(startTimeMs)
.setVersion(Constants.VERSION)
.setGitCommitId(Constants.REVISION_SHORT)
.addAllApplicationInfo(appInfos)
.build();

RssProtos.StatusCode status;
Expand Down Expand Up @@ -225,7 +227,8 @@ public RssSendHeartBeatResponse sendHeartBeat(RssSendHeartBeatRequest request) {
request.getStorageInfo(),
request.getNettyPort(),
request.getJettyPort(),
request.getStartTimeMs());
request.getStartTimeMs(),
request.getAppInfos());

RssSendHeartBeatResponse response;
RssProtos.StatusCode statusCode = rpcResponse.getStatus();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@

package org.apache.uniffle.client.request;

import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.uniffle.common.ServerStatus;
import org.apache.uniffle.common.storage.StorageInfo;
import org.apache.uniffle.proto.RssProtos;

public class RssSendHeartBeatRequest {

Expand All @@ -39,6 +41,7 @@ public class RssSendHeartBeatRequest {
private final int nettyPort;
private final int jettyPort;
private final long startTimeMs;
private final List<RssProtos.ApplicationInfo> appInfos;

public RssSendHeartBeatRequest(
String shuffleServerId,
Expand All @@ -54,7 +57,8 @@ public RssSendHeartBeatRequest(
Map<String, StorageInfo> storageInfo,
int nettyPort,
int jettyPort,
long startTimeMs) {
long startTimeMs,
List<RssProtos.ApplicationInfo> appInfos) {
this.shuffleServerId = shuffleServerId;
this.shuffleServerIp = shuffleServerIp;
this.shuffleServerPort = shuffleServerPort;
Expand All @@ -69,6 +73,7 @@ public RssSendHeartBeatRequest(
this.nettyPort = nettyPort;
this.jettyPort = jettyPort;
this.startTimeMs = startTimeMs;
this.appInfos = appInfos;
}

public String getShuffleServerId() {
Expand Down Expand Up @@ -126,4 +131,8 @@ public int getJettyPort() {
public long getStartTimeMs() {
return startTimeMs;
}

public List<RssProtos.ApplicationInfo> getAppInfos() {
return appInfos;
}
}
12 changes: 12 additions & 0 deletions proto/src/main/proto/Rss.proto
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,17 @@ enum ServerStatus {
// todo: more status, such as UPGRADING
}

message ApplicationInfo {
string appId = 1;
int64 partitionNum = 2;
int64 memorySize = 3;
int64 localFileNum = 4;
int64 localTotalSize = 5;
int64 hadoopFileNum = 6;
int64 hadoopTotalSize = 7;
int64 totalSize = 8;
}

message ShuffleServerHeartBeatRequest {
ShuffleServerId serverId = 1;
int64 usedMemory = 2;
Expand All @@ -286,6 +297,7 @@ message ShuffleServerHeartBeatRequest {
optional string version = 22;
optional string gitCommitId = 23;
optional int64 startTimeMs = 24;
repeated ApplicationInfo applicationInfo = 25;
}

message ShuffleServerHeartBeatResponse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.uniffle.server;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -33,6 +34,7 @@
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.storage.StorageInfo;
import org.apache.uniffle.common.util.ThreadUtils;
import org.apache.uniffle.proto.RssProtos;

public class RegisterHeartBeat {

Expand Down Expand Up @@ -84,7 +86,8 @@ public void startHeartBeat() {
shuffleServer.getStorageManager().getStorageInfo(),
shuffleServer.getNettyPort(),
shuffleServer.getJettyPort(),
shuffleServer.getStartTimeMs());
shuffleServer.getStartTimeMs(),
shuffleServer.getAppInfos());
} catch (Exception e) {
LOG.warn("Error happened when send heart beat to coordinator");
}
Expand All @@ -107,7 +110,8 @@ public boolean sendHeartBeat(
Map<String, StorageInfo> localStorageInfo,
int nettyPort,
int jettyPort,
long startTimeMs) {
long startTimeMs,
List<RssProtos.ApplicationInfo> appInfos) {
// use `rss.server.heartbeat.interval` as the timeout option
RssSendHeartBeatRequest request =
new RssSendHeartBeatRequest(
Expand All @@ -124,7 +128,8 @@ public boolean sendHeartBeat(
localStorageInfo,
nettyPort,
jettyPort,
startTimeMs);
startTimeMs,
appInfos);

if (coordinatorClient.sendHeartBeat(request).getStatusCode() == StatusCode.SUCCESS) {
return true;
Expand Down
Loading