Skip to content

Commit

Permalink
[GOBBLIN-2179] Enhance GoT observability with WorkUnitsSizeSummary
Browse files Browse the repository at this point in the history
…and `WorkUnitSizeInfo` (#4082)
  • Loading branch information
phet authored and Will-Lo committed Dec 11, 2024
1 parent a3854ae commit 2fdf422
Show file tree
Hide file tree
Showing 24 changed files with 786 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@


/**
* Collects messages from log4j and converts them into issues that are used in {@link AutomaticTroubleshooter}.
* Collects messages from log4j and converts them into {@link Issue}s used by the {@link org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooter}.
*/
@Slf4j
@ThreadSafe
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class Issue {
*
* It can be used for making programmatic decisions on how to handle and recover from this issue.
*
* The code length should be less than {@link Issue.MAX_ISSUE_CODE_LENGTH}
* The code length should be less than {@link Issue#MAX_ISSUE_CODE_LENGTH}
* */
private final String code;

Expand All @@ -71,14 +71,14 @@ public class Issue {
*
* This is a full name of the class that logged the error or generated the issue.
*
* The class name length should be less than {@link Issue.MAX_CLASSNAME_LENGTH}
* The class name length should be less than {@link Issue#MAX_CLASSNAME_LENGTH}
* */
private final String sourceClass;

/**
* If the issue was generated from an exception, then a full exception class name should be stored here.
*
* The class name length should be less than {@link Issue.MAX_CLASSNAME_LENGTH}
* The class name length should be less than {@link Issue#MAX_CLASSNAME_LENGTH}
*/
private final String exceptionClass;

Expand Down
1 change: 1 addition & 0 deletions gobblin-temporal/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ dependencies {
compile (externalDependency.helix) {
exclude group: 'io.dropwizard.metrics', module: 'metrics-core'
}
compile externalDependency.tdigest
compile externalDependency."temporal-sdk"
testCompile project(path: ':gobblin-cluster', configuration: 'tests')
testCompile project(":gobblin-example")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@
/** Activity for generating {@link WorkUnit}s and persisting them to the {@link org.apache.hadoop.fs.FileSystem}, per "job properties" */
@ActivityInterface
public interface GenerateWorkUnits {

public static final String NUM_WORK_UNITS_SIZE_SUMMARY_QUANTILES = GenerateWorkUnits.class.getName() + ".numWorkUnitsSizeInfoQuantiles";
public static final int DEFAULT_NUM_WORK_UNITS_SIZE_SUMMARY_QUANTILES = 10;


/** @return the number of {@link WorkUnit}s generated and persisted */
@ActivityMethod
GenerateWorkUnitsResult generateWorkUnits(Properties jobProps, EventSubmitterContext eventSubmitterContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,31 @@
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import com.google.api.client.util.Lists;
import com.google.common.io.Closer;

import io.temporal.failure.ApplicationFailure;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;

import com.google.api.client.util.Lists;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.io.Closer;
import com.tdunning.math.stats.TDigest;

import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.converter.initializer.ConverterInitializerFactory;
import org.apache.gobblin.destination.DestinationDatasetHandlerService;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.runtime.AbstractJobLauncher;
import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooter;
import org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooterFactory;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.source.Source;
import org.apache.gobblin.source.WorkUnitStreamSource;
import org.apache.gobblin.source.workunit.BasicWorkUnitStream;
Expand All @@ -50,6 +57,7 @@
import org.apache.gobblin.temporal.ddm.activity.GenerateWorkUnits;
import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
import org.apache.gobblin.temporal.ddm.work.GenerateWorkUnitsResult;
import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary;
import org.apache.gobblin.temporal.ddm.work.assistance.Help;
import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
import org.apache.gobblin.writer.initializer.WriterInitializerFactory;
Expand All @@ -58,6 +66,39 @@
@Slf4j
public class GenerateWorkUnitsImpl implements GenerateWorkUnits {

/** [Internal, implementation class] Size sketch/digest of a collection of {@link MultiWorkUnit}s */
@Data
@VisibleForTesting
protected static class WorkUnitsSizeDigest {
private final long totalSize;
/** a top-level work unit has no parent - a root */
private final TDigest topLevelWorkUnitsSizeDigest;
/** a constituent work unit has no children - a leaf */
private final TDigest constituentWorkUnitsSizeDigest;

public WorkUnitsSizeSummary asSizeSummary(int numQuantiles) {
Preconditions.checkArgument(numQuantiles > 0, "numQuantiles must be > 0");
final double quantilesWidth = 1.0 / numQuantiles;

List<Double> topLevelQuantileValues = getQuantiles(topLevelWorkUnitsSizeDigest, numQuantiles);
List<Double> constituentQuantileValues = getQuantiles(constituentWorkUnitsSizeDigest, numQuantiles);
return new WorkUnitsSizeSummary(
totalSize,
topLevelWorkUnitsSizeDigest.size(), constituentWorkUnitsSizeDigest.size(),
numQuantiles, quantilesWidth,
topLevelQuantileValues, constituentQuantileValues);
}

private static List<Double> getQuantiles(TDigest digest, int numQuantiles) {
List<Double> quantileMinSizes = Lists.newArrayList();
for (int i = 1; i <= numQuantiles; i++) {
quantileMinSizes.add(digest.quantile((i * 1.0) / numQuantiles));
}
return quantileMinSizes;
}
}


@Override
public GenerateWorkUnitsResult generateWorkUnits(Properties jobProps, EventSubmitterContext eventSubmitterContext) {
// TODO: decide whether to acquire a job lock (as MR did)!
Expand All @@ -80,12 +121,18 @@ public GenerateWorkUnitsResult generateWorkUnits(Properties jobProps, EventSubmi
FileSystem fs = JobStateUtils.openFileSystem(jobState);
fs.mkdirs(workDirRoot);

Set<String> resourcesToCleanUp = new HashSet<>();
List<WorkUnit> workUnits = generateWorkUnitsForJobStateAndCollectCleanupPaths(jobState, eventSubmitterContext, closer, resourcesToCleanUp);
Set<String> pathsToCleanUp = new HashSet<>();
List<WorkUnit> workUnits = generateWorkUnitsForJobStateAndCollectCleanupPaths(jobState, eventSubmitterContext, closer, pathsToCleanUp);

int numSizeSummaryQuantiles = getConfiguredNumSizeSummaryQuantiles(jobState);
WorkUnitsSizeSummary wuSizeSummary = digestWorkUnitsSize(workUnits).asSizeSummary(numSizeSummaryQuantiles);
log.info("Discovered WorkUnits: {}", wuSizeSummary);

JobStateUtils.writeWorkUnits(workUnits, workDirRoot, jobState, fs);
JobStateUtils.writeJobState(jobState, workDirRoot, fs);
JobStateUtils.writeJobState(jobState, workDirRoot, fs); // ATTENTION: the writing of `JobState` after all WUs signifies WU gen+serialization now complete

return new GenerateWorkUnitsResult(jobState.getTaskCount(), resourcesToCleanUp);
String sourceClassName = JobStateUtils.getSourceClassName(jobState);
return new GenerateWorkUnitsResult(jobState.getTaskCount(), sourceClassName, wuSizeSummary, pathsToCleanUp);
} catch (ReflectiveOperationException roe) {
String errMsg = "Unable to construct a source for generating workunits for job " + jobState.getJobId();
log.error(errMsg, roe);
Expand All @@ -101,7 +148,7 @@ public GenerateWorkUnitsResult generateWorkUnits(Properties jobProps, EventSubmi
}

protected List<WorkUnit> generateWorkUnitsForJobStateAndCollectCleanupPaths(JobState jobState, EventSubmitterContext eventSubmitterContext, Closer closer,
Set<String> resourcesToCleanUp)
Set<String> pathsToCleanUp)
throws ReflectiveOperationException {
Source<?, ?> source = JobStateUtils.createSource(jobState);
WorkUnitStream workUnitStream = source instanceof WorkUnitStreamSource
Expand All @@ -127,7 +174,7 @@ protected List<WorkUnit> generateWorkUnitsForJobStateAndCollectCleanupPaths(JobS
DestinationDatasetHandlerService datasetHandlerService = closer.register(
new DestinationDatasetHandlerService(jobState, canCleanUpTempDirs, eventSubmitterContext.create()));
WorkUnitStream handledWorkUnitStream = datasetHandlerService.executeHandlers(workUnitStream);
resourcesToCleanUp.addAll(calculateWorkDirsToCleanup(handledWorkUnitStream));
pathsToCleanUp.addAll(calculateWorkDirsToCleanup(handledWorkUnitStream));
// initialize writer and converter(s)
// TODO: determine whether registration here is effective, or the lifecycle of this activity is too brief (as is likely!)
closer.register(WriterInitializerFactory.newInstace(jobState, handledWorkUnitStream)).initialize();
Expand All @@ -151,24 +198,24 @@ protected List<WorkUnit> generateWorkUnitsForJobStateAndCollectCleanupPaths(JobS
}

protected static Set<String> calculateWorkDirsToCleanup(WorkUnitStream workUnitStream) {
Set<String> resourcesToCleanUp = new HashSet<>();
Set<String> workDirPaths = new HashSet<>();
// Validate every workunit if they have the temp dir props since some workunits may be commit steps
Iterator<WorkUnit> workUnitIterator = workUnitStream.getWorkUnits();
while (workUnitIterator.hasNext()) {
WorkUnit workUnit = workUnitIterator.next();
if (workUnit.isMultiWorkUnit()) {
List<WorkUnit> workUnitList = ((MultiWorkUnit) workUnit).getWorkUnits();
for (WorkUnit wu : workUnitList) {
resourcesToCleanUp.addAll(collectTaskStagingAndOutputDirsFromWorkUnit(wu));
// WARNING/TODO: NOT resilient to nested multi-workunits... should it be?
workDirPaths.addAll(collectTaskStagingAndOutputDirsFromWorkUnit(wu));
}
} else {
resourcesToCleanUp.addAll(collectTaskStagingAndOutputDirsFromWorkUnit(workUnit));
workDirPaths.addAll(collectTaskStagingAndOutputDirsFromWorkUnit(workUnit));
}
}
return resourcesToCleanUp;
return workDirPaths;
}


private static Set<String> collectTaskStagingAndOutputDirsFromWorkUnit(WorkUnit workUnit) {
Set<String> resourcesToCleanUp = new HashSet<>();
if (workUnit.contains(ConfigurationKeys.WRITER_STAGING_DIR)) {
Expand All @@ -183,4 +230,41 @@ private static Set<String> collectTaskStagingAndOutputDirsFromWorkUnit(WorkUnit
}
return resourcesToCleanUp;
}

/** @return the {@link WorkUnitsSizeDigest} for `workUnits` */
protected static WorkUnitsSizeDigest digestWorkUnitsSize(List<WorkUnit> workUnits) {
AtomicLong totalSize = new AtomicLong(0L);
TDigest topLevelWorkUnitsDigest = TDigest.createDigest(100);
TDigest constituentWorkUnitsDigest = TDigest.createDigest(100);

Iterator<WorkUnit> workUnitIterator = workUnits.iterator();
while (workUnitIterator.hasNext()) {
WorkUnit workUnit = workUnitIterator.next();
if (workUnit.isMultiWorkUnit()) {
List<WorkUnit> subWorkUnitsList = ((MultiWorkUnit) workUnit).getWorkUnits();
AtomicLong mwuAggSize = new AtomicLong(0L);
// WARNING/TODO: NOT resilient to nested multi-workunits... should it be?
subWorkUnitsList.stream().mapToLong(wu -> wu.getPropAsLong(ServiceConfigKeys.WORK_UNIT_SIZE, 0)).forEach(wuSize -> {
constituentWorkUnitsDigest.add(wuSize);
mwuAggSize.addAndGet(wuSize);
});
totalSize.addAndGet(mwuAggSize.get());
topLevelWorkUnitsDigest.add(mwuAggSize.get());
} else {
long wuSize = workUnit.getPropAsLong(ServiceConfigKeys.WORK_UNIT_SIZE, 0);
totalSize.addAndGet(wuSize);
constituentWorkUnitsDigest.add(wuSize);
topLevelWorkUnitsDigest.add(wuSize);
}
}

// TODO - decide whether helpful/necessary to `.compress()`
topLevelWorkUnitsDigest.compress();
constituentWorkUnitsDigest.compress();
return new WorkUnitsSizeDigest(totalSize.get(), topLevelWorkUnitsDigest, constituentWorkUnitsDigest);
}

public static int getConfiguredNumSizeSummaryQuantiles(State state) {
return state.getPropAsInt(GenerateWorkUnits.NUM_WORK_UNITS_SIZE_SUMMARY_QUANTILES, GenerateWorkUnits.DEFAULT_NUM_WORK_UNITS_SIZE_SUMMARY_QUANTILES);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.gobblin.runtime.TaskState;
import org.apache.gobblin.source.Source;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.temporal.ddm.work.EagerFsDirBackedWorkUnitClaimCheckWorkload;
import org.apache.gobblin.temporal.ddm.work.assistance.Help;
import org.apache.gobblin.util.JobLauncherUtils;
import org.apache.gobblin.util.ParallelRunner;
Expand Down Expand Up @@ -76,9 +77,14 @@ public static FileSystem openFileSystem(JobState jobState) throws IOException {
return Help.loadFileSystemForUriForce(getFileSystemUri(jobState), jobState);
}

/** @return a new instance of {@link Source} identified by {@link ConfigurationKeys#SOURCE_CLASS_KEY} */
/** @return the FQ class name, presumed configured as {@link ConfigurationKeys#SOURCE_CLASS_KEY} */
public static String getSourceClassName(JobState jobState) {
return jobState.getProp(ConfigurationKeys.SOURCE_CLASS_KEY);
}

/** @return a new instance of {@link Source}, identified by {@link ConfigurationKeys#SOURCE_CLASS_KEY} */
public static Source<?, ?> createSource(JobState jobState) throws ReflectiveOperationException {
Class<?> sourceClass = Class.forName(jobState.getProp(ConfigurationKeys.SOURCE_CLASS_KEY));
Class<?> sourceClass = Class.forName(getSourceClassName(jobState));
log.info("Creating source: '{}'", sourceClass.getName());
Source<?, ?> source = new SourceDecorator<>(
Source.class.cast(sourceClass.newInstance()),
Expand Down Expand Up @@ -145,7 +151,10 @@ public static Path getTaskStateStorePath(JobState jobState, FileSystem fs) {
return fs.makeQualified(jobOutputPath);
}

/** write serialized {@link WorkUnit}s in parallel into files named after the jobID and task IDs */
/**
* write serialized {@link WorkUnit}s in parallel into files named to tunnel {@link org.apache.gobblin.util.WorkUnitSizeInfo}.
* {@link EagerFsDirBackedWorkUnitClaimCheckWorkload} (and possibly others) may later recover such size info.
*/
public static void writeWorkUnits(List<WorkUnit> workUnits, Path workDirRootPath, JobState jobState, FileSystem fs)
throws IOException {
String jobId = jobState.getJobId();
Expand All @@ -159,7 +168,8 @@ public static void writeWorkUnits(List<WorkUnit> workUnits, Path workDirRootPath
JobLauncherUtils.WorkUnitPathCalculator pathCalculator = new JobLauncherUtils.WorkUnitPathCalculator();
int i = 0;
for (WorkUnit workUnit : workUnits) {
Path workUnitFile = pathCalculator.calcNextPath(workUnit, jobId, targetDirPath);
// tunnel each WU's size info via its filename, for `EagerFsDirBackedWorkUnitClaimCheckWorkload#extractTunneledWorkUnitSizeInfo`
Path workUnitFile = pathCalculator.calcNextPathWithTunneledSizeInfo(workUnit, jobId, targetDirPath);
if (i++ == 0) {
log.info("Writing work unit file [first of {}]: '{}'", workUnits.size(), workUnitFile);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,19 @@

import java.net.URI;
import java.util.Comparator;
import java.util.Optional;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;

import lombok.extern.slf4j.Slf4j;

import com.fasterxml.jackson.annotation.JsonIgnore;

import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
import org.apache.gobblin.util.Id;
import org.apache.gobblin.util.WorkUnitSizeInfo;


/**
Expand All @@ -33,6 +40,7 @@
*/
@lombok.NoArgsConstructor // IMPORTANT: for jackson (de)serialization
@lombok.ToString(callSuper = true)
@Slf4j
public class EagerFsDirBackedWorkUnitClaimCheckWorkload extends AbstractEagerFsDirBackedWorkload<WorkUnitClaimCheck> {
private EventSubmitterContext eventSubmitterContext;

Expand All @@ -43,8 +51,9 @@ public EagerFsDirBackedWorkUnitClaimCheckWorkload(URI fileSystemUri, String hdfs

@Override
protected WorkUnitClaimCheck fromFileStatus(FileStatus fileStatus) {
// begin by setting all correlators to empty
return new WorkUnitClaimCheck("", this.getFileSystemUri(), fileStatus.getPath().toString(), this.eventSubmitterContext);
// begin by setting all correlators to empty string - later we'll `acknowledgeOrdering()`
Path filePath = fileStatus.getPath();
return new WorkUnitClaimCheck("", this.getFileSystemUri(), filePath.toString(), extractTunneledWorkUnitSizeInfo(filePath), this.eventSubmitterContext);
}

@Override
Expand All @@ -58,4 +67,20 @@ protected void acknowledgeOrdering(int index, WorkUnitClaimCheck item) {
// later, after the post-total-ordering indices are know, use each item's index as its correlator
item.setCorrelator(Integer.toString(index));
}

/**
* @return the {@link WorkUnitSizeInfo}, when encoded in the filename; otherwise {@link WorkUnitSizeInfo#empty()} when no size info about {@link WorkUnit}
* @see org.apache.gobblin.util.JobLauncherUtils.WorkUnitPathCalculator#calcNextPathWithTunneledSizeInfo(WorkUnit, String, Path)
*/
protected static WorkUnitSizeInfo extractTunneledWorkUnitSizeInfo(Path filePath) {
String fileName = filePath.getName();
Optional<WorkUnitSizeInfo> optSizeInfo = Optional.empty();
try {
String maybeEncodedSizeInfo = Id.parse(fileName.substring(0, fileName.lastIndexOf('.'))).getName(); // strip extension
optSizeInfo = WorkUnitSizeInfo.decode(maybeEncodedSizeInfo);
} catch (Exception e) { // log, but swallow any `Id.parse` error
log.warn("Filename NOT `Id.parse`able: '" + filePath + "' - " + e.getMessage());
}
return optSizeInfo.orElse(WorkUnitSizeInfo.empty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@
@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
@RequiredArgsConstructor
public class GenerateWorkUnitsResult {
// NOTE: `@NonNull` to include field in `@RequiredArgsConstructor`, despite - "warning: @NonNull is meaningless on a primitive... @RequiredArgsConstructor"
@NonNull private int generatedWuCount;
@NonNull private String sourceClass;
@NonNull private WorkUnitsSizeSummary workUnitsSizeSummary;
// Resources that the Temporal Job Launcher should clean up for Gobblin temporary work directory paths in writers
@NonNull private Set<String> workDirPathsToDelete;
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@
@EqualsAndHashCode(callSuper = true) // to prevent findbugs warning - "equals method overrides equals in superclass and may not be symmetric"
@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
public class PriorJobStateWUProcessingSpec extends WUProcessingSpec {
@NonNull
private List<Tag<?>> tags = new ArrayList<>();
@NonNull private List<Tag<?>> tags = new ArrayList<>();
@NonNull private String metricsSuffix = GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_JOB_METRICS_SUFFIX;

public PriorJobStateWUProcessingSpec(URI fileSystemUri, String workUnitsDir, EventSubmitterContext eventSubmitterContext) {
Expand Down
Loading

0 comments on commit 2fdf422

Please sign in to comment.