Skip to content

Commit

Permalink
[MINOR] feat(server,dashboard,coordinator): Report configured metrics…
Browse files Browse the repository at this point in the history
… of server to coordinator and display to dashboard (#2239)

### What changes were proposed in this pull request?

Report configured metrics of server to coordinator and display to dashboard

### Why are the changes needed?

With this feature, it can be extensible for displaying server state by dashboard.

In the beginning, we only need to display the app number in the dashboard, after a few days, we want to add partition with node in to dashboard, maybe no more days later, we need another value displayed in to dashboard.

So it is not extensible if we add it one by one, this is why I invent this feature, it can be easy to let user config the metrics what they think it is necessary and should be displayed in the dashboard.

### Does this PR introduce _any_ user-facing change?

- new config: rss.server.displayMetricsList


### How was this patch tested?

Test Locally.

<img width="288" alt="image" src="https://github.com/user-attachments/assets/a10fdbe4-08fa-4ccb-8614-80b2447b30f9">
  • Loading branch information
maobaolong authored Nov 19, 2024
1 parent c1bfa04 commit 910823d
Show file tree
Hide file tree
Showing 11 changed files with 131 additions and 124 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -150,4 +150,12 @@ public void unregisterSupplierGauge(String name) {
supplierGaugeMap.remove(name);
}
}

public String[] getDefaultLabelNames() {
return defaultLabelNames;
}

public String[] getDefaultLabelValues() {
return defaultLabelValues;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,8 @@ private ServerNode toServerNode(ShuffleServerHeartBeatRequest request) {
request.getStartTimeMs(),
request.getVersion(),
request.getGitCommitId(),
request.getApplicationInfoList());
request.getApplicationInfoList(),
request.getDisplayMetricsMap());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

public class ServerNode implements Comparable<ServerNode> {

private final Map<String, String> displayMetrics;
private String id;
private String ip;
private int grpcPort;
Expand Down Expand Up @@ -187,7 +188,8 @@ public ServerNode(
startTime,
"",
"",
Collections.EMPTY_LIST);
Collections.EMPTY_LIST,
Collections.EMPTY_MAP);
}

public ServerNode(
Expand All @@ -206,7 +208,8 @@ public ServerNode(
long startTime,
String version,
String gitCommitId,
List<RssProtos.ApplicationInfo> appInfos) {
List<RssProtos.ApplicationInfo> appInfos,
Map<String, String> displayMetrics) {
this.id = id;
this.ip = ip;
this.grpcPort = grpcPort;
Expand All @@ -230,6 +233,7 @@ public ServerNode(
this.gitCommitId = gitCommitId;
this.appIdToInfos = new ConcurrentHashMap<>();
appInfos.forEach(appInfo -> appIdToInfos.put(appInfo.getAppId(), appInfo));
this.displayMetrics = displayMetrics;
}

public ShuffleServerId convertToGrpcProto() {
Expand Down Expand Up @@ -388,4 +392,8 @@ public String getGitCommitId() {
public Map<String, RssProtos.ApplicationInfo> getAppIdToInfos() {
return appIdToInfos;
}

public Map<String, String> getDisplayMetrics() {
return displayMetrics;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,13 @@
</el-button>
</template>
</el-table-column>
<el-table-column prop="displayMetrics" label="DisplayMetrics" min-width="120">
<template v-slot="{ row }">
<div v-for="(value, key) in row.displayMetrics" :key="key">
<span>{{ key }}:{{ value }}</span>
</div>
</template>
</el-table-column>
</el-table>
</div>
</template>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,9 @@
import org.apache.uniffle.client.response.RssSendHeartBeatResponse;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ServerStatus;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.storage.StorageInfo;
import org.apache.uniffle.common.storage.StorageInfoUtils;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.proto.CoordinatorServerGrpc;
Expand Down Expand Up @@ -113,51 +111,40 @@ public GetShuffleServerListResponse getShuffleServerList() {
return blockingStub.getShuffleServerList(Empty.newBuilder().build());
}

public ShuffleServerHeartBeatResponse doSendHeartBeat(
String id,
String ip,
int port,
long usedMemory,
long preAllocatedMemory,
long availableMemory,
int eventNumInFlush,
long timeout,
Set<String> tags,
ServerStatus serverStatus,
Map<String, StorageInfo> storageInfo,
int nettyPort,
int jettyPort,
long startTimeMs,
List<RssProtos.ApplicationInfo> appInfos) {
public ShuffleServerHeartBeatResponse doSendHeartBeat(RssSendHeartBeatRequest rssRequest) {
ShuffleServerId serverId =
ShuffleServerId.newBuilder()
.setId(id)
.setIp(ip)
.setPort(port)
.setNettyPort(nettyPort)
.setJettyPort(jettyPort)
.setId(rssRequest.getShuffleServerId())
.setIp(rssRequest.getShuffleServerIp())
.setPort(rssRequest.getShuffleServerPort())
.setNettyPort(rssRequest.getNettyPort())
.setJettyPort(rssRequest.getJettyPort())
.build();
ShuffleServerHeartBeatRequest request =
ShuffleServerHeartBeatRequest.newBuilder()
.setServerId(serverId)
.setUsedMemory(usedMemory)
.setPreAllocatedMemory(preAllocatedMemory)
.setAvailableMemory(availableMemory)
.setEventNumInFlush(eventNumInFlush)
.addAllTags(tags)
.setStatusValue(serverStatus.ordinal())
.putAllStorageInfo(StorageInfoUtils.toProto(storageInfo))
.setStartTimeMs(startTimeMs)
.setUsedMemory(rssRequest.getUsedMemory())
.setPreAllocatedMemory(rssRequest.getPreAllocatedMemory())
.setAvailableMemory(rssRequest.getAvailableMemory())
.setEventNumInFlush(rssRequest.getEventNumInFlush())
.addAllTags(rssRequest.getTags())
.setStatusValue(rssRequest.getServerStatus().ordinal())
.putAllStorageInfo(StorageInfoUtils.toProto(rssRequest.getStorageInfo()))
.setStartTimeMs(rssRequest.getStartTimeMs())
.setVersion(Constants.VERSION)
.setGitCommitId(Constants.REVISION_SHORT)
.addAllApplicationInfo(appInfos)
.addAllApplicationInfo(rssRequest.getAppInfos())
.putAllDisplayMetrics(rssRequest.getDisplayMetrics())
.build();

RssProtos.StatusCode status;
ShuffleServerHeartBeatResponse response = null;

try {
response = blockingStub.withDeadlineAfter(timeout, TimeUnit.MILLISECONDS).heartbeat(request);
response =
blockingStub
.withDeadlineAfter(rssRequest.getTimeout(), TimeUnit.MILLISECONDS)
.heartbeat(request);
status = response.getStatus();
} catch (StatusRuntimeException e) {
LOG.error("Failed to doSendHeartBeat, request: {}", request, e);
Expand Down Expand Up @@ -212,23 +199,7 @@ public RssProtos.GetShuffleAssignmentsResponse doGetShuffleAssignments(

@Override
public RssSendHeartBeatResponse sendHeartBeat(RssSendHeartBeatRequest request) {
ShuffleServerHeartBeatResponse rpcResponse =
doSendHeartBeat(
request.getShuffleServerId(),
request.getShuffleServerIp(),
request.getShuffleServerPort(),
request.getUsedMemory(),
request.getPreAllocatedMemory(),
request.getAvailableMemory(),
request.getEventNumInFlush(),
request.getTimeout(),
request.getTags(),
request.getServerStatus(),
request.getStorageInfo(),
request.getNettyPort(),
request.getJettyPort(),
request.getStartTimeMs(),
request.getAppInfos());
ShuffleServerHeartBeatResponse rpcResponse = doSendHeartBeat(request);

RssSendHeartBeatResponse response;
RssProtos.StatusCode statusCode = rpcResponse.getStatus();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.uniffle.client.request;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -42,6 +43,7 @@ public class RssSendHeartBeatRequest {
private final int jettyPort;
private final long startTimeMs;
private final List<RssProtos.ApplicationInfo> appInfos;
private final Map<String, String> displayMetrics;

public RssSendHeartBeatRequest(
String shuffleServerId,
Expand All @@ -58,7 +60,8 @@ public RssSendHeartBeatRequest(
int nettyPort,
int jettyPort,
long startTimeMs,
List<RssProtos.ApplicationInfo> appInfos) {
List<RssProtos.ApplicationInfo> appInfos,
Map<String, String> displayMetrics) {
this.shuffleServerId = shuffleServerId;
this.shuffleServerIp = shuffleServerIp;
this.shuffleServerPort = shuffleServerPort;
Expand All @@ -74,6 +77,7 @@ public RssSendHeartBeatRequest(
this.jettyPort = jettyPort;
this.startTimeMs = startTimeMs;
this.appInfos = appInfos;
this.displayMetrics = displayMetrics;
}

public String getShuffleServerId() {
Expand Down Expand Up @@ -135,4 +139,8 @@ public long getStartTimeMs() {
public List<RssProtos.ApplicationInfo> getAppInfos() {
return appInfos;
}

public Map<String, String> getDisplayMetrics() {
return displayMetrics == null ? Collections.emptyMap() : displayMetrics;
}
}
1 change: 1 addition & 0 deletions proto/src/main/proto/Rss.proto
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ message ShuffleServerHeartBeatRequest {
optional string gitCommitId = 23;
optional int64 startTimeMs = 24;
repeated ApplicationInfo applicationInfo = 25;
map<string, string> displayMetrics = 26;
}

message ShuffleServerHeartBeatResponse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@

package org.apache.uniffle.server;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

Expand All @@ -30,11 +27,8 @@
import org.apache.uniffle.client.factory.CoordinatorClientFactory;
import org.apache.uniffle.client.impl.grpc.CoordinatorGrpcRetryableClient;
import org.apache.uniffle.client.request.RssSendHeartBeatRequest;
import org.apache.uniffle.common.ServerStatus;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.storage.StorageInfo;
import org.apache.uniffle.common.util.ThreadUtils;
import org.apache.uniffle.proto.RssProtos;

public class RegisterHeartBeat {

Expand Down Expand Up @@ -73,21 +67,26 @@ public void startHeartBeat() {
Runnable runnable =
() -> {
try {
sendHeartBeat(
shuffleServer.getId(),
shuffleServer.getIp(),
shuffleServer.getGrpcPort(),
shuffleServer.getUsedMemory(),
shuffleServer.getPreAllocatedMemory(),
shuffleServer.getAvailableMemory(),
shuffleServer.getEventNumInFlush(),
shuffleServer.getTags(),
shuffleServer.getServerStatus(),
shuffleServer.getStorageManager().getStorageInfo(),
shuffleServer.getNettyPort(),
shuffleServer.getJettyPort(),
shuffleServer.getStartTimeMs(),
shuffleServer.getAppInfos());
// use `rss.server.heartbeat.interval` as the timeout option
RssSendHeartBeatRequest request =
new RssSendHeartBeatRequest(
shuffleServer.getId(),
shuffleServer.getIp(),
shuffleServer.getGrpcPort(),
shuffleServer.getUsedMemory(),
shuffleServer.getPreAllocatedMemory(),
shuffleServer.getAvailableMemory(),
shuffleServer.getEventNumInFlush(),
heartBeatInterval,
shuffleServer.getTags(),
shuffleServer.getServerStatus(),
shuffleServer.getStorageManager().getStorageInfo(),
shuffleServer.getNettyPort(),
shuffleServer.getJettyPort(),
shuffleServer.getStartTimeMs(),
shuffleServer.getAppInfos(),
shuffleServer.getDisplayMetrics());
sendHeartBeat(request);
} catch (Exception e) {
LOG.warn("Error happened when send heart beat to coordinator");
}
Expand All @@ -97,46 +96,17 @@ public void startHeartBeat() {
}

@VisibleForTesting
public boolean sendHeartBeat(
String id,
String ip,
int grpcPort,
long usedMemory,
long preAllocatedMemory,
long availableMemory,
int eventNumInFlush,
Set<String> tags,
ServerStatus serverStatus,
Map<String, StorageInfo> localStorageInfo,
int nettyPort,
int jettyPort,
long startTimeMs,
List<RssProtos.ApplicationInfo> appInfos) {
// use `rss.server.heartbeat.interval` as the timeout option
RssSendHeartBeatRequest request =
new RssSendHeartBeatRequest(
id,
ip,
grpcPort,
usedMemory,
preAllocatedMemory,
availableMemory,
eventNumInFlush,
heartBeatInterval,
tags,
serverStatus,
localStorageInfo,
nettyPort,
jettyPort,
startTimeMs,
appInfos);

public boolean sendHeartBeat(RssSendHeartBeatRequest request) {
if (coordinatorClient.sendHeartBeat(request).getStatusCode() == StatusCode.SUCCESS) {
return true;
}
return false;
}

public long getHeartBeatInterval() {
return heartBeatInterval;
}

public void shutdown() {
coordinatorClient.close();
service.shutdownNow();
Expand Down
Loading

0 comments on commit 910823d

Please sign in to comment.