diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssigner.java b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssigner.java
index 68c36ad0c..9996bf72e 100644
--- a/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssigner.java
+++ b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssigner.java
@@ -26,10 +26,6 @@
* Performs partition assignment based on partition throughput information
*/
public class LoadBasedPartitionAssigner {
- // TODO: move these to config class
- private static final Integer DEFAULT_KB_RATE = 5;
- private static final Integer DEFAULT_MESSAGE_RATE = 5;
-
/**
* Performs partition assignment based on partition throughput information.
*
@@ -61,7 +57,9 @@ public Map> assignPartitions(ClusterThroughputInfo t
// sort the current assignment's tasks on total throughput
Map taskThroughputMap = new HashMap<>();
- PartitionThroughputInfo defaultPartitionInfo = new PartitionThroughputInfo(DEFAULT_KB_RATE, DEFAULT_MESSAGE_RATE, "");
+ PartitionThroughputInfo defaultPartitionInfo = new PartitionThroughputInfo(
+ PartitionAssignmentStrategyConfig.PARTITION_BYTES_IN_KB_RATE_DEFAULT,
+ PartitionAssignmentStrategyConfig.PARTITION_MESSAGES_IN_RATE_DEFAULT, "");
newPartitions.forEach((task, partitions) -> {
int totalThroughput = partitions.stream()
.mapToInt(p -> partitionInfoMap.getOrDefault(p, defaultPartitionInfo).getBytesInKBRate())
diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssignmentStrategy.java b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssignmentStrategy.java
index 3439d9d2b..315c3e3e3 100644
--- a/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssignmentStrategy.java
+++ b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssignmentStrategy.java
@@ -37,9 +37,8 @@
public class LoadBasedPartitionAssignmentStrategy extends StickyPartitionAssignmentStrategy {
private static final Logger LOG = LoggerFactory.getLogger(LoadBasedPartitionAssignmentStrategy.class.getName());
- // TODO Make these constants configurable
- private static final long THROUGHPUT_INFO_FETCH_TIMEOUT_MS_DEFAULT = Duration.ofSeconds(10).toMillis();
- private static final long THROUGHPUT_INFO_FETCH_RETRY_PERIOD_MS_DEFAULT = Duration.ofSeconds(1).toMillis();
+ private static final int THROUGHPUT_INFO_FETCH_TIMEOUT_MS_DEFAULT = (int) Duration.ofSeconds(10).toMillis();
+ private static final int THROUGHPUT_INFO_FETCH_RETRY_PERIOD_MS_DEFAULT = (int) Duration.ofSeconds(1).toMillis();
private static final int TASK_CAPACITY_MBPS_DEFAULT = 4;
private static final int TASK_CAPACITY_UTILIZATION_PCT_DEFAULT = 90;
@@ -48,6 +47,8 @@ public class LoadBasedPartitionAssignmentStrategy extends StickyPartitionAssignm
private final DatastreamSourceClusterResolver _sourceClusterResolver;
private final int _taskCapacityMBps;
private final int _taskCapacityUtilizationPct;
+ private final int _throughputInfoFetchTimeoutMs;
+ private final int _throughputInfoFetchRetryPeriodMs;
/**
@@ -57,7 +58,9 @@ public LoadBasedPartitionAssignmentStrategy(PartitionThroughputProvider throughp
DatastreamSourceClusterResolver sourceClusterResolver, Optional maxTasks,
Optional imbalanceThreshold, Optional maxPartitionPerTask, boolean enableElasticTaskAssignment,
Optional partitionsPerTask, Optional partitionFullnessFactorPct,
- Optional taskCapacityMBps, Optional taskCapacityUtilizationPct, Optional zkClient,
+ Optional taskCapacityMBps, Optional taskCapacityUtilizationPct,
+ Optional throughputInfoFetchTimeoutMs, Optional throughputInfoFetchRetryPeriodMs,
+ Optional zkClient,
String clusterName) {
super(maxTasks, imbalanceThreshold, maxPartitionPerTask, enableElasticTaskAssignment, partitionsPerTask,
partitionFullnessFactorPct, zkClient, clusterName);
@@ -65,6 +68,9 @@ public LoadBasedPartitionAssignmentStrategy(PartitionThroughputProvider throughp
_sourceClusterResolver = sourceClusterResolver;
_taskCapacityMBps = taskCapacityMBps.orElse(TASK_CAPACITY_MBPS_DEFAULT);
_taskCapacityUtilizationPct = taskCapacityUtilizationPct.orElse(TASK_CAPACITY_UTILIZATION_PCT_DEFAULT);
+ _throughputInfoFetchTimeoutMs = throughputInfoFetchTimeoutMs.orElse(THROUGHPUT_INFO_FETCH_TIMEOUT_MS_DEFAULT);
+ _throughputInfoFetchRetryPeriodMs = throughputInfoFetchRetryPeriodMs.
+ orElse(THROUGHPUT_INFO_FETCH_RETRY_PERIOD_MS_DEFAULT);
}
@Override
@@ -123,7 +129,7 @@ private Map fetchPartitionThroughputInfo() {
LOG.warn("Failed to fetch partition throughput info.");
return null;
}
- }, Objects::nonNull, THROUGHPUT_INFO_FETCH_RETRY_PERIOD_MS_DEFAULT, THROUGHPUT_INFO_FETCH_TIMEOUT_MS_DEFAULT)
+ }, Objects::nonNull, _throughputInfoFetchRetryPeriodMs, _throughputInfoFetchTimeoutMs)
.orElseThrow(RetriesExhaustedException::new);
}
}
diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssignmentStrategyFactory.java b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssignmentStrategyFactory.java
index 43bfef91a..cbe9fc209 100644
--- a/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssignmentStrategyFactory.java
+++ b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssignmentStrategyFactory.java
@@ -45,7 +45,8 @@ public AssignmentStrategy createStrategy(Properties assignmentStrategyProperties
return new LoadBasedPartitionAssignmentStrategy(provider, clusterResolver, _config.getMaxTasks(),
_config.getImbalanceThreshold(), _config.getMaxPartitions(), enableElasticTaskAssignment,
_config.getPartitionsPerTask(), _config.getPartitionFullnessThresholdPct(), _config.getTaskCapacityMBps(),
- _config.getTaskCapacityUtilizationPct(), zkClient, _config.getCluster());
+ _config.getTaskCapacityUtilizationPct(), _config.getThroughputInfoFetchTimeoutMs(),
+ _config.getThroughputInfoFetchRetryPeriodMs(), zkClient, _config.getCluster());
}
protected PartitionThroughputProvider constructPartitionThroughputProvider() {
diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedTaskCountEstimator.java b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedTaskCountEstimator.java
index 4ff08b5e4..813d5bc5b 100644
--- a/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedTaskCountEstimator.java
+++ b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedTaskCountEstimator.java
@@ -27,10 +27,6 @@
*/
public class LoadBasedTaskCountEstimator {
private static final Logger LOG = LoggerFactory.getLogger(LoadBasedTaskCountEstimator.class.getName());
- // TODO Move these to config class
- private static final int BYTES_IN_KB_RATE_DEFAULT = 5;
- private static final int MESSAGES_IN_RATE_DEFAULT = 5;
-
private final int _taskCapacityMBps;
private final int _taskCapacityUtilizationPct;
@@ -64,8 +60,9 @@ public int getTaskCount(ClusterThroughputInfo throughputInfo, List assig
Set allPartitions = new HashSet<>(assignedPartitions);
allPartitions.addAll(unassignedPartitions);
- PartitionThroughputInfo defaultThroughputInfo = new PartitionThroughputInfo(BYTES_IN_KB_RATE_DEFAULT,
- MESSAGES_IN_RATE_DEFAULT, "");
+ PartitionThroughputInfo defaultThroughputInfo = new PartitionThroughputInfo(
+ PartitionAssignmentStrategyConfig.PARTITION_BYTES_IN_KB_RATE_DEFAULT,
+ PartitionAssignmentStrategyConfig.PARTITION_MESSAGES_IN_RATE_DEFAULT, "");
// total throughput in KB/sec
int totalThroughput = allPartitions.stream()
.map(p -> throughputMap.getOrDefault(p, defaultThroughputInfo))
diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/PartitionAssignmentStrategyConfig.java b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/PartitionAssignmentStrategyConfig.java
index 4fe5b21d2..faf8285d7 100644
--- a/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/PartitionAssignmentStrategyConfig.java
+++ b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/PartitionAssignmentStrategyConfig.java
@@ -22,6 +22,8 @@ public final class PartitionAssignmentStrategyConfig {
public static final String CFG_MAX_PARTITION_PER_TASK = "maxPartitionsPerTask";
public static final String CFG_PARTITIONS_PER_TASK = "partitionsPerTask";
public static final String CFG_PARTITION_FULLNESS_THRESHOLD_PCT = "partitionFullnessThresholdPct";
+ public static final String CFG_THROUGHPUT_INFO_FETCH_TIMEOUT_MS = "throughputInfoFetchTimeoutMs";
+ public static final String CFG_THROUGHPUT_INFO_FETCH_RETRY_PERIOD_MS = "throughputInfoFetchRetryPeriodMs";
public static final String CFG_TASK_CAPACITY_MBPS = "taskCapacityMBps";
public static final String CFG_TASK_CAPACITY_UTILIZATION_PCT = "taskCapacityUtilizationPct";
public static final String CFG_ENABLE_ELASTIC_TASK_ASSIGNMENT = "enableElasticTaskAssignment";
@@ -30,6 +32,9 @@ public final class PartitionAssignmentStrategyConfig {
public static final String CFG_ZK_SESSION_TIMEOUT = "zkSessionTimeout";
public static final String CFG_ZK_CONNECTION_TIMEOUT = "zkConnectionTimeout";
+ public static final int PARTITION_BYTES_IN_KB_RATE_DEFAULT = 5;
+ public static final int PARTITION_MESSAGES_IN_RATE_DEFAULT = 5;
+
public static final boolean DEFAULT_ENABLE_ELASTIC_TASK_ASSIGNMENT = false;
private final Properties _config;
@@ -40,6 +45,8 @@ public final class PartitionAssignmentStrategyConfig {
private final Optional _partitionFullnessThresholdPct;
private final Optional _taskCapacityMBps;
private final Optional _taskCapacityUtilizationPct;
+ private final Optional _throughputInfoFetchTimeoutMs;
+ private final Optional _throughputInfoFetchRetryPeriodMs;
private final String _cluster;
private final String _zkAddress;
private final int _zkSessionTimeout;
@@ -60,6 +67,8 @@ public PartitionAssignmentStrategyConfig(Properties config) {
int cfgPartitionFullnessThresholdPct = props.getIntInRange(CFG_PARTITION_FULLNESS_THRESHOLD_PCT, 0, 0, 100);
int cfgTaskCapacityMBps = props.getInt(CFG_TASK_CAPACITY_MBPS, 0);
int cfgTaskCapacityUtilizationPct = props.getIntInRange(CFG_TASK_CAPACITY_UTILIZATION_PCT, 0, 0, 100);
+ int cfgThroughputInfoFetchTimeoutMs = props.getInt(CFG_THROUGHPUT_INFO_FETCH_TIMEOUT_MS, 0);
+ int cfgThroughputInfoFetchRetryPeriodMs = props.getInt(CFG_THROUGHPUT_INFO_FETCH_RETRY_PERIOD_MS, 0);
// Set to Optional.empty() if the value is 0
_maxTasks = cfgMaxTasks > 0 ? Optional.of(cfgMaxTasks) : Optional.empty();
@@ -74,6 +83,10 @@ public PartitionAssignmentStrategyConfig(Properties config) {
_taskCapacityMBps = cfgTaskCapacityMBps > 0 ? Optional.of(cfgTaskCapacityMBps) : Optional.empty();
_taskCapacityUtilizationPct = cfgTaskCapacityUtilizationPct > 0 ? Optional.of(cfgTaskCapacityUtilizationPct) :
Optional.empty();
+ _throughputInfoFetchTimeoutMs = cfgThroughputInfoFetchTimeoutMs > 0 ?
+ Optional.of(cfgThroughputInfoFetchTimeoutMs) : Optional.empty();
+ _throughputInfoFetchRetryPeriodMs = cfgThroughputInfoFetchRetryPeriodMs > 0 ?
+ Optional.of(cfgThroughputInfoFetchRetryPeriodMs) : Optional.empty();
_cluster = props.getString(CFG_CLUSTER_NAME, null);
_zkAddress = props.getString(CFG_ZK_ADDRESS, null);
_zkSessionTimeout = props.getInt(CFG_ZK_SESSION_TIMEOUT, ZkClient.DEFAULT_SESSION_TIMEOUT);
@@ -136,6 +149,22 @@ public Optional getTaskCapacityUtilizationPct() {
return _taskCapacityUtilizationPct;
}
+ /**
+ * Gets throughput info fetch timeout in milliseconds
+ * @return Throughput info fetch timeout in milliseconds
+ */
+ public Optional getThroughputInfoFetchTimeoutMs() {
+ return _throughputInfoFetchTimeoutMs;
+ }
+
+ /**
+ * Gets the throughput info fetch retry period in milliseconds
+ * @return Throughput info fetch retry period in milliseconds
+ */
+ public Optional getThroughputInfoFetchRetryPeriodMs() {
+ return _throughputInfoFetchRetryPeriodMs;
+ }
+
/**
* Gets cluster
* @return Cluster
diff --git a/datastream-server/src/test/java/com/linkedin/datastream/server/assignment/TestPartitionAssignmentStrategyConfig.java b/datastream-server/src/test/java/com/linkedin/datastream/server/assignment/TestPartitionAssignmentStrategyConfig.java
index 57fe0c37a..2da5d8dd8 100644
--- a/datastream-server/src/test/java/com/linkedin/datastream/server/assignment/TestPartitionAssignmentStrategyConfig.java
+++ b/datastream-server/src/test/java/com/linkedin/datastream/server/assignment/TestPartitionAssignmentStrategyConfig.java
@@ -25,6 +25,10 @@ public class TestPartitionAssignmentStrategyConfig {
private static final String CFG_MAX_TASKS_VALUE = "20";
private static final String CFG_PARTITIONS_PER_TASK_VALUE = "15";
private static final String CFG_PARTITIONS_FULLNESS_THRESHOLD_PCT_VALUE = "75";
+ private static final String CFG_TASK_CAPACITY_MBPS_VALUE = "5";
+ private static final String CFG_TASK_CAPACITY_UTILIZATION_PCT_VALUE = "82";
+ private static final String CFG_THROUGHPUT_INFO_FETCH_TIMEOUT_MS_VALUE = "1100";
+ private static final String CFG_THROUGHPUT_INFO_FETCH_RETRY_PERIOD_MS_VALUE = "1200";
private static final String CFG_ENABLE_ELASTIC_TASK_ASSIGNMENT_VALUE = "true";
private static final String CFG_ZK_ADDRESS_VALUE = "dummyZk";
private static final String CFG_ZK_SESSION_TIMEOUT_VALUE = "1000";
@@ -42,6 +46,13 @@ public void configValuesCorrectlyAssignedTest() {
props.setProperty(PartitionAssignmentStrategyConfig.CFG_PARTITIONS_PER_TASK, CFG_PARTITIONS_PER_TASK_VALUE);
props.setProperty(PartitionAssignmentStrategyConfig.CFG_PARTITION_FULLNESS_THRESHOLD_PCT,
CFG_PARTITIONS_FULLNESS_THRESHOLD_PCT_VALUE);
+ props.setProperty(PartitionAssignmentStrategyConfig.CFG_TASK_CAPACITY_MBPS, CFG_TASK_CAPACITY_MBPS_VALUE);
+ props.setProperty(PartitionAssignmentStrategyConfig.CFG_TASK_CAPACITY_UTILIZATION_PCT,
+ CFG_TASK_CAPACITY_UTILIZATION_PCT_VALUE);
+ props.setProperty(PartitionAssignmentStrategyConfig.CFG_THROUGHPUT_INFO_FETCH_TIMEOUT_MS,
+ CFG_THROUGHPUT_INFO_FETCH_TIMEOUT_MS_VALUE);
+ props.setProperty(PartitionAssignmentStrategyConfig.CFG_THROUGHPUT_INFO_FETCH_RETRY_PERIOD_MS,
+ CFG_THROUGHPUT_INFO_FETCH_RETRY_PERIOD_MS_VALUE);
props.setProperty(PartitionAssignmentStrategyConfig.CFG_ENABLE_ELASTIC_TASK_ASSIGNMENT,
CFG_ENABLE_ELASTIC_TASK_ASSIGNMENT_VALUE);
props.setProperty(PartitionAssignmentStrategyConfig.CFG_ZK_ADDRESS, CFG_ZK_ADDRESS_VALUE);
@@ -56,6 +67,13 @@ public void configValuesCorrectlyAssignedTest() {
Assert.assertEquals(config.getMaxPartitions(), Optional.of(Integer.parseInt(CFG_MAX_PARTITION_PER_TASK_VALUE)));
Assert.assertEquals(config.getPartitionFullnessThresholdPct(),
Optional.of(Integer.parseInt(CFG_PARTITIONS_FULLNESS_THRESHOLD_PCT_VALUE)));
+ Assert.assertEquals(config.getTaskCapacityMBps(), Optional.of(Integer.parseInt(CFG_TASK_CAPACITY_MBPS_VALUE)));
+ Assert.assertEquals(config.getTaskCapacityUtilizationPct(),
+ Optional.of(Integer.parseInt(CFG_TASK_CAPACITY_UTILIZATION_PCT_VALUE)));
+ Assert.assertEquals(config.getThroughputInfoFetchTimeoutMs(),
+ Optional.of(Integer.parseInt(CFG_THROUGHPUT_INFO_FETCH_TIMEOUT_MS_VALUE)));
+ Assert.assertEquals(config.getThroughputInfoFetchRetryPeriodMs(),
+ Optional.of(Integer.parseInt(CFG_THROUGHPUT_INFO_FETCH_RETRY_PERIOD_MS_VALUE)));
Assert.assertEquals(config.isElasticTaskAssignmentEnabled(),
Boolean.parseBoolean(CFG_ENABLE_ELASTIC_TASK_ASSIGNMENT_VALUE));
Assert.assertEquals(config.getZkAddress(), CFG_ZK_ADDRESS_VALUE);