Skip to content

Commit

Permalink
[controller] add logs for push job status (#1225)
Browse files Browse the repository at this point in the history
Emit logs for push job status whenever push job metrics are emitted
  • Loading branch information
m-nagarajan authored Oct 9, 2024
1 parent 6901820 commit fe01543
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1199,48 +1199,70 @@ static boolean isPushJobFailedDueToUserError(

static void emitPushJobStatusMetrics(
Map<String, PushJobStatusStats> pushJobStatusStatsMap,
PushJobDetails pushJobDetails,
PushJobStatusRecordKey pushJobDetailsKey,
PushJobDetails pushJobDetailsValue,
Set<PushJobCheckpoints> pushJobUserErrorCheckpoints) {
List<PushJobDetailsStatusTuple> overallStatuses = pushJobDetails.getOverallStatus();
List<PushJobDetailsStatusTuple> overallStatuses = pushJobDetailsValue.getOverallStatus();
if (overallStatuses.isEmpty()) {
return;
}
try {
PushJobDetailsStatus overallStatus =
PushJobDetailsStatus.valueOf(overallStatuses.get(overallStatuses.size() - 1).getStatus());
if (PushJobDetailsStatus.isTerminal(overallStatus.getValue())) {
String cluster = pushJobDetails.getClusterName().toString();
String cluster = pushJobDetailsValue.getClusterName().toString();
PushJobStatusStats pushJobStatusStats = pushJobStatusStatsMap.get(cluster);
Utf8 incPushKey = new Utf8("incremental.push");
boolean isIncrementalPush = false;
if (pushJobDetails.getPushJobConfigs().containsKey(incPushKey)) {
isIncrementalPush = Boolean.parseBoolean(pushJobDetails.getPushJobConfigs().get(incPushKey).toString());
if (pushJobDetailsValue.getPushJobConfigs().containsKey(incPushKey)) {
isIncrementalPush = Boolean.parseBoolean(pushJobDetailsValue.getPushJobConfigs().get(incPushKey).toString());
}
StringBuilder logMessage = new StringBuilder();
logMessage.append("Push job status for store name: ")
.append(pushJobDetailsKey.getStoreName())
.append(", version: ")
.append(pushJobDetailsKey.getVersionNumber())
.append(", cluster: ")
.append(cluster);
if (PushJobDetailsStatus.isFailed(overallStatus)) {
if (isPushJobFailedDueToUserError(overallStatus, pushJobDetails, pushJobUserErrorCheckpoints)) {
logMessage.append(" failed with status: ").append(overallStatus);
if (isPushJobFailedDueToUserError(overallStatus, pushJobDetailsValue, pushJobUserErrorCheckpoints)) {
logMessage.append(" due to user error");
if (isIncrementalPush) {
pushJobStatusStats.recordIncrementalPushFailureDueToUserErrorSensor();
} else {
pushJobStatusStats.recordBatchPushFailureDueToUserErrorSensor();
}
} else {
logMessage.append(" due to non-user error");
if (isIncrementalPush) {
pushJobStatusStats.recordIncrementalPushFailureNotDueToUserErrorSensor();
} else {
pushJobStatusStats.recordBatchPushFailureNotDueToUserErrorSensor();
}
}
} else if (PushJobDetailsStatus.isSucceeded(overallStatus)) {
logMessage.append(" succeeded with status: ").append(overallStatus);
// Emit metrics for successful push jobs
if (isIncrementalPush) {
pushJobStatusStats.recordIncrementalPushSuccessSensor();
} else {
pushJobStatusStats.recordBatchPushSuccessSensor();
}
}
LOGGER.info(
"{}. Incremental push: {}, push job id: {}, checkpoint: {}",
logMessage.toString(),
isIncrementalPush,
pushJobDetailsValue.getPushId(),
PushJobCheckpoints.valueOf(pushJobDetailsValue.getPushJobLatestCheckpoint()));
}
} catch (Exception e) {
LOGGER.error("Failed to emit push job status metrics with pushJobDetails: {}", pushJobDetails.toString(), e);
LOGGER.error(
"Failed to emit push job status metrics. pushJobDetails key: {}, value: {}",
pushJobDetailsKey.toString(),
pushJobDetailsValue.toString(),
e);
}
}

Expand All @@ -1253,7 +1275,7 @@ static void emitPushJobStatusMetrics(
@Override
public void sendPushJobDetails(PushJobStatusRecordKey key, PushJobDetails value) {
// Emit push job status metrics
emitPushJobStatusMetrics(pushJobStatusStatsMap, value, pushJobUserErrorCheckpoints);
emitPushJobStatusMetrics(pushJobStatusStatsMap, key, value, pushJobUserErrorCheckpoints);
// Send push job details to the push job status system store
if (pushJobStatusStoreClusterName.isEmpty()) {
throw new VeniceException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.linkedin.venice.status.PushJobDetailsStatus;
import com.linkedin.venice.status.protocol.PushJobDetails;
import com.linkedin.venice.status.protocol.PushJobDetailsStatusTuple;
import com.linkedin.venice.status.protocol.PushJobStatusRecordKey;
import com.linkedin.venice.utils.DataProviderUtils;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -38,6 +39,7 @@ public class TestPushJobStatusStats {
public void testEmitPushJobStatusMetrics(boolean isIncrementalPush, boolean useUserProvidedUserErrorCheckpoints) {
Set<PushJobCheckpoints> userErrorCheckpoints =
useUserProvidedUserErrorCheckpoints ? CUSTOM_USER_ERROR_CHECKPOINTS : DEFAULT_PUSH_JOB_USER_ERROR_CHECKPOINTS;
PushJobStatusRecordKey key = new PushJobStatusRecordKey("test", 1);
PushJobDetails pushJobDetails = mock(PushJobDetails.class);
Map<CharSequence, CharSequence> pushJobConfigs = new HashMap<>();
pushJobConfigs.put(new Utf8("incremental.push"), String.valueOf(isIncrementalPush));
Expand Down Expand Up @@ -65,7 +67,7 @@ public void testEmitPushJobStatusMetrics(boolean isIncrementalPush, boolean useU

for (PushJobCheckpoints checkpoint: PushJobCheckpoints.values()) {
when(pushJobDetails.getPushJobLatestCheckpoint()).thenReturn(checkpoint.getValue());
emitPushJobStatusMetrics(pushJobStatusStatsMap, pushJobDetails, userErrorCheckpoints);
emitPushJobStatusMetrics(pushJobStatusStatsMap, key, pushJobDetails, userErrorCheckpoints);
boolean isUserError = userErrorCheckpoints.contains(checkpoint);

if (isUserError) {
Expand Down

0 comments on commit fe01543

Please sign in to comment.