Skip to content

Commit

Permalink
[controller] Log improvement for inc push status report (#759)
Browse files Browse the repository at this point in the history
[controller] Log improvement for inc push status report

Today, in some cases, controller doesn't report the details of unfinished partitions if partition
ingestion has issues and makes the debugging work laborious.

This change adds log lines for such cases and combines existing inc push status logs to make it concise.
  • Loading branch information
lluwm authored Nov 17, 2023
1 parent aed7bfb commit 0e6ca73
Showing 1 changed file with 36 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -395,17 +396,46 @@ private ExecutionStatus checkIncrementalPushStatus(
String incrementalPushVersion,
int partitionCount,
int replicationFactor) {

class IncPushPartitionStates {
private static final int UNKNOWN = -1;
private int partitionId;
private int minRequiredReplicationFactor;
private int numOfReplicasWithEoip;

private IncPushPartitionStates(int partitionId, int minRequiredReplicationFactor, int numOfReplicasWithEoip) {
this.partitionId = partitionId;
this.minRequiredReplicationFactor = minRequiredReplicationFactor;
this.numOfReplicasWithEoip = numOfReplicasWithEoip;
}

private IncPushPartitionStates(int partitionId) {
this(partitionId, UNKNOWN, UNKNOWN);
}

@Override
public String toString() {
return String.format(
"(%s, %s, %s)",
partitionId,
minRequiredReplicationFactor == UNKNOWN ? "U" : minRequiredReplicationFactor,
numOfReplicasWithEoip == UNKNOWN ? "U" : numOfReplicasWithEoip);
}
}

// when push status map is null or empty means that given incremental push hasn't been created/started yet
if (pushStatusMap == null || pushStatusMap.isEmpty()) {
return NOT_CREATED;
}
int numberOfPartitionsWithEnoughEoipReceivedReplicas = 0;
boolean isIncrementalPushStatusAvailableForAtLeastOneReplica = false;

List<IncPushPartitionStates> unFinishedPartitions = new LinkedList<>();
for (int partitionId = 0; partitionId < partitionCount; partitionId++) {
Map<CharSequence, Integer> replicaStatusMap = pushStatusMap.get(partitionId);
// inc push status of replicas of this partition is not available yet
if (replicaStatusMap == null || replicaStatusMap.isEmpty()) {
unFinishedPartitions.add(new IncPushPartitionStates(partitionId));
continue;
}

Expand All @@ -427,25 +457,21 @@ private ExecutionStatus checkIncrementalPushStatus(
if (numberOfReplicasWithEoipStatus >= minRequiredReplicationFactor) {
numberOfPartitionsWithEnoughEoipReceivedReplicas++;
} else {
LOGGER.info(
"For partitionId {} need {} replicas to acknowledge the delivery of EOIP but got only {}. "
+ "kafkaTopic:{} incrementalPushVersion:{}",
partitionId,
minRequiredReplicationFactor,
numberOfReplicasWithEoipStatus,
kafkaTopic,
incrementalPushVersion);
unFinishedPartitions
.add(new IncPushPartitionStates(partitionId, minRequiredReplicationFactor, numberOfReplicasWithEoipStatus));
}
}
if (numberOfPartitionsWithEnoughEoipReceivedReplicas == partitionCount) {
return END_OF_INCREMENTAL_PUSH_RECEIVED;
}
LOGGER.info(
"Only {} out of {} partitions are sufficiently replicated. kafkaTopic:{} incrementalPushVersion:{}",
"{} out of {} partitions are sufficiently replicated, kafkaTopic: {}, incrementalPushVersion: {}, unfinished partitions (partitionId, minRequired, No. of EOIP replicas): {}, size: {}",
numberOfPartitionsWithEnoughEoipReceivedReplicas,
partitionCount,
kafkaTopic,
incrementalPushVersion);
incrementalPushVersion,
unFinishedPartitions,
unFinishedPartitions.size());

// to report SOIP at least one replica should have seen either SOIP or EOIP
if (isIncrementalPushStatusAvailableForAtLeastOneReplica) {
Expand Down

0 comments on commit 0e6ca73

Please sign in to comment.