Skip to content

Commit

Permalink
[Feature]集群均衡支持LogDir级别的Disk均衡(#1164)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhaoli committed Oct 8, 2023
1 parent b1aa12b commit 6df36de
Show file tree
Hide file tree
Showing 13 changed files with 661 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ public class BalanceTask {
private int partition;
//副本分配列表
private List<Integer> replicas;
private List<String> logDirs;

public String getTopic() {
return topic;
Expand All @@ -32,12 +33,21 @@ public void setReplicas(List<Integer> replicas) {
this.replicas = replicas;
}

public List<String> getLogDirs() {
return logDirs;
}

public void setLogDirs(List<String> logDirs) {
this.logDirs = logDirs;
}

@Override
public String toString() {
return "BalanceTask{" +
"topic='" + topic + '\'' +
", partition=" + partition +
", replicas=" + replicas +
", logDirs=" + logDirs +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ public List<BalanceTask> resultTask() {
task.setPartition(proposal.tp().partition());
List<Integer> replicas = proposal.newReplicas().stream().map(ReplicaPlacementInfo::brokerId).collect(Collectors.toList());
task.setReplicas(replicas);
List<String> logDirs = proposal.newReplicas().stream().map(ReplicaPlacementInfo::logdir).collect(Collectors.toList());
task.setLogDirs(logDirs);
balanceTasks.add(task);
});
return balanceTasks;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import org.apache.kafka.common.TopicPartition;

import java.util.*;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Predicate;
import java.util.stream.Collectors;

Expand All @@ -11,19 +12,16 @@
* @date 2022/4/29
*/
public class Broker implements Comparable<Broker> {
public static final Broker NONE = new Broker(new Rack("-1"), -1, "localhost", true, new Capacity());

private final Rack rack;
private final int id;
private final String host;
private final boolean isOffline;

private final Set<Replica> replicas;
private final Set<Replica> leaderReplicas;
private final Map<String, Map<Integer, Replica>> topicReplicas;

private final Map<String, LogDir> logDirs;
private final Load load;

private final Capacity capacity;

public Broker(Rack rack, int id, String host, boolean isOffline, Capacity capacity) {
Expand All @@ -34,10 +32,15 @@ public Broker(Rack rack, int id, String host, boolean isOffline, Capacity capaci
this.replicas = new HashSet<>();
this.leaderReplicas = new HashSet<>();
this.topicReplicas = new HashMap<>();
this.logDirs = new HashMap<>();
this.load = new Load();
this.capacity = capacity;
}

public void addLogDir(LogDir logDir) {
this.logDirs.put(logDir.name(), logDir);
}

public Rack rack() {
return rack;
}
Expand All @@ -58,6 +61,10 @@ public Set<Replica> replicas() {
return Collections.unmodifiableSet(this.replicas);
}

public Map<String, LogDir> logDirs() {
return Collections.unmodifiableMap(this.logDirs);
}

public SortedSet<Replica> sortedReplicasFor(Resource resource, boolean reverse) {
return sortedReplicasFor(null, resource, reverse);
}
Expand All @@ -84,27 +91,34 @@ public Set<Replica> leaderReplicas() {
}

public Load load() {
return load;
return this.load;
}

public Capacity capacity() {
return capacity;
}


public double utilizationFor(Resource resource) {
return this.load.loadFor(resource) / this.capacity.capacityFor(resource);
}


public double expectedUtilizationAfterAdd(Resource resource, Load loadToChange) {
return (this.load.loadFor(resource) + ((loadToChange == null) ? 0 : loadToChange.loadFor(resource)))
/ this.capacity.capacityFor(resource);
}


public double expectedUtilizationAfterRemove(Resource resource, Load loadToChange) {
return (this.load.loadFor(resource) - ((loadToChange == null) ? 0 : loadToChange.loadFor(resource)))
/ this.capacity.capacityFor(resource);
}

public LogDir logDir(String name) {
return this.logDirs.get(name);
}

public Replica replica(TopicPartition topicPartition) {
Map<Integer, Replica> replicas = this.topicReplicas.get(topicPartition.topic());
if (replicas == null) {
Expand All @@ -113,7 +127,7 @@ public Replica replica(TopicPartition topicPartition) {
return replicas.get(topicPartition.partition());
}

void addReplica(Replica replica) {
void addReplica(String logDir, Replica replica) {
// Add replica to list of all replicas in the broker.
if (this.replicas.contains(replica)) {
throw new IllegalStateException(String.format("Broker %d already has replica %s", this.id,
Expand All @@ -131,9 +145,12 @@ void addReplica(Replica replica) {

// Add replica load to the broker load.
this.load.addLoad(replica.load());

// Add replica to list of replicas in the logDir
this.logDirs.get(logDir).addReplica(replica);
}

Replica removeReplica(TopicPartition topicPartition) {
Replica removeReplica(String sourceLogDir, TopicPartition topicPartition) {
Replica replica = replica(topicPartition);
if (replica != null) {
this.replicas.remove(replica);
Expand All @@ -145,6 +162,7 @@ Replica removeReplica(TopicPartition topicPartition) {
this.leaderReplicas.remove(replica);
}
this.load.subtractLoad(replica.load());
this.logDirs.get(sourceLogDir).removeReplica(topicPartition);
}
return replica;
}
Expand Down Expand Up @@ -193,6 +211,7 @@ public String toString() {
", replicas=" + replicas +
", leaderReplicas=" + leaderReplicas +
", topicReplicas=" + topicReplicas +
", logDirs=" + logDirs +
", load=" + load +
", capacity=" + capacity +
'}';
Expand All @@ -219,4 +238,8 @@ public Collection<Replica> replicasOfTopicInBroker(String topic) {
public Set<Replica> currentOfflineReplicas() {
return replicas.stream().filter(Replica::isCurrentOffline).collect(Collectors.toSet());
}

public LogDir randomLogDir() {
return logDirs.values().stream().collect(Collectors.toList()).get(ThreadLocalRandom.current().nextInt(logDirs.size()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,25 @@ public SortedSet<Broker> sortedBrokersFor(Predicate<? super Broker> filter, Reso
return sortedBrokers;
}

public SortedSet<LogDir> sortedLogDirsFor(Predicate<? super LogDir> filter, Resource resource, boolean reverse) {
Comparator<LogDir> comparator =
Comparator.<LogDir>comparingDouble(b -> b.utilizationFor(resource));
if (reverse)
comparator = comparator.reversed();

SortedSet<LogDir> sortedLogDirs = new TreeSet<>(comparator);
this.brokersById.values().stream().forEach(broker -> {
if (filter == null) {
sortedLogDirs.addAll(broker.logDirs().values());
} else {
sortedLogDirs.addAll(broker.logDirs().values().stream()
.filter(filter).collect(Collectors.toList()));
}
});

return sortedLogDirs;
}

public Load load() {
Load load = new Load();
for (Broker broker : this.brokersById.values()) {
Expand Down Expand Up @@ -101,30 +120,33 @@ public Broker broker(int brokerId) {
return this.brokersById.get(brokerId);
}

public Broker addBroker(String rackId, int brokerId, String host, boolean isOffline, Capacity capacity) {
public Broker addBroker(String rackId, int brokerId, String host, boolean isOffline, Capacity capacity, Map<String, Capacity> logDirCapacities) {
Rack rack = rack(rackId);
if (rack == null)
throw new IllegalArgumentException("Rack: " + rackId + "is not exists.");
Broker broker = new Broker(rack, brokerId, host, isOffline, capacity);
for(Map.Entry<String, Capacity> entry : logDirCapacities.entrySet()) {
broker.addLogDir(new LogDir(entry.getKey(), broker, entry.getValue()));
}
rack.addBroker(broker);
this.brokersById.put(brokerId, broker);
return broker;
}

public Replica addReplica(int brokerId, TopicPartition topicPartition, boolean isLeader, Load load) {
return addReplica(brokerId, topicPartition, isLeader, false, load);
public Replica addReplica(int brokerId, String logDir, TopicPartition topicPartition, boolean isLeader, Load load) {
return addReplica(brokerId, logDir, topicPartition, isLeader, false, load);
}

public Replica addReplica(int brokerId, TopicPartition topicPartition, boolean isLeader, boolean isOffline, Load load) {
public Replica addReplica(int brokerId, String logDir, TopicPartition topicPartition, boolean isLeader, boolean isOffline, Load load) {
Broker broker = broker(brokerId);
if (broker == null) {
throw new IllegalArgumentException("Broker: " + brokerId + "is not exists.");
}

Replica replica = new Replica(broker, topicPartition, isLeader, isOffline);
Replica replica = new Replica(broker, logDir, topicPartition, isLeader, isOffline);
replica.setLoad(load);
// add to broker
broker.addReplica(replica);
broker.addReplica(logDir, replica);

Map<TopicPartition, Partition> partitions = this.partitionsByTopic
.computeIfAbsent(topicPartition.topic(), k -> new HashMap<>());
Expand All @@ -139,9 +161,9 @@ public Replica addReplica(int brokerId, TopicPartition topicPartition, boolean i
return replica;
}

public Replica removeReplica(int brokerId, TopicPartition topicPartition) {
public Replica removeReplica(int brokerId, String sourceLogDir, TopicPartition topicPartition) {
Broker broker = broker(brokerId);
return broker.removeReplica(topicPartition);
return broker.removeReplica(sourceLogDir, topicPartition);
}

public void relocateLeadership(String goal, String actionType, TopicPartition topicPartition, int sourceBrokerId, int destinationBrokerId) {
Expand Down Expand Up @@ -171,19 +193,20 @@ public void relocateLeadership(TopicPartition topicPartition, int sourceBrokerId
partition.relocateLeadership(destinationReplica);
}

public void relocateReplica(String goal, String actionType, TopicPartition topicPartition, int sourceBrokerId, int destinationBrokerId) {
relocateReplica(topicPartition, sourceBrokerId, destinationBrokerId);
public void relocateReplica(String goal, String actionType, TopicPartition topicPartition, int sourceBrokerId, String sourceLogDir, int destinationBrokerId, String destinationLogDir) {
relocateReplica(topicPartition, sourceBrokerId, sourceLogDir, destinationBrokerId, destinationLogDir);
addBalanceActionHistory(goal, actionType, topicPartition, sourceBrokerId, destinationBrokerId);
}

public void relocateReplica(TopicPartition topicPartition, int sourceBrokerId, int destinationBrokerId) {
Replica replica = removeReplica(sourceBrokerId, topicPartition);
public void relocateReplica(TopicPartition topicPartition, int sourceBrokerId, String sourceLogDir, int destinationBrokerId, String destinationLogDir) {
Replica replica = removeReplica(sourceBrokerId, sourceLogDir, topicPartition);
if (replica == null) {
throw new IllegalArgumentException("Replica is not in the cluster.");
}
Broker destinationBroker = broker(destinationBrokerId);
replica.setBroker(destinationBroker);
destinationBroker.addReplica(replica);
replica.setLogDir(destinationLogDir);
destinationBroker.addReplica(destinationLogDir, replica);
}

private void addBalanceActionHistory(String goal, String actionType, TopicPartition topicPartition, int sourceBrokerId, int destinationBrokerId) {
Expand All @@ -208,7 +231,7 @@ public Map<TopicPartition, List<ReplicaPlacementInfo>> getReplicaDistribution()
for (Map<TopicPartition, Partition> tp : partitionsByTopic.values()) {
tp.values().forEach(i -> {
i.replicas().forEach(j -> replicaDistribution.computeIfAbsent(j.topicPartition(), k -> new ArrayList<>())
.add(new ReplicaPlacementInfo(j.broker().id(), "")));
.add(new ReplicaPlacementInfo(j.broker().id(), j.logDir())));
});
}
return replicaDistribution;
Expand All @@ -221,7 +244,7 @@ public Replica partition(TopicPartition tp) {
public Map<TopicPartition, ReplicaPlacementInfo> getLeaderDistribution() {
Map<TopicPartition, ReplicaPlacementInfo> leaderDistribution = new HashMap<>();
for (Broker broker : brokersById.values()) {
broker.leaderReplicas().forEach(i -> leaderDistribution.put(i.topicPartition(), new ReplicaPlacementInfo(broker.id(), "")));
broker.leaderReplicas().forEach(i -> leaderDistribution.put(i.topicPartition(), new ReplicaPlacementInfo(broker.id(), i.logDir())));
}
return leaderDistribution;
}
Expand Down
Loading

0 comments on commit 6df36de

Please sign in to comment.