Skip to content

Commit

Permalink
[GOBBLIN-2135] Cache Gobblin YARN application jars (#4030)
Browse files Browse the repository at this point in the history
* Implement caching for gobblin yarn app launcher so that jobs do not repeatedly upload jars and files to hdfs
  • Loading branch information
Will-Lo committed Sep 17, 2024
1 parent 9409fed commit e995ef6
Show file tree
Hide file tree
Showing 7 changed files with 279 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
import org.apache.gobblin.util.JobLauncherUtils;
import org.apache.gobblin.util.ParallelRunner;
import org.apache.gobblin.util.SerializationUtils;
import org.apache.gobblin.util.filesystem.HdfsJarUploadUtils;
import org.apache.gobblin.util.reflection.RestrictedFieldAccessingUtils;
/**
* An implementation of {@link JobLauncher} that launches a Gobblin job as a Hadoop MR job.
Expand Down Expand Up @@ -154,8 +155,6 @@ public class MRJobLauncher extends AbstractJobLauncher {
// Configuration that make uploading of jar files more reliable,
// since multiple Gobblin Jobs are sharing the same jar directory.
private static final int MAXIMUM_JAR_COPY_RETRY_TIMES_DEFAULT = 5;
private static final int WAITING_TIME_ON_IMCOMPLETE_UPLOAD = 3000;

public static final String MR_TYPE_KEY = ConfigurationKeys.METRICS_CONFIGURATIONS_PREFIX + "mr.type";
public static final String MAPPER_TASK_NUM_KEY = ConfigurationKeys.METRICS_CONFIGURATIONS_PREFIX + "reporting.mapper.task.num";
public static final String MAPPER_TASK_ATTEMPT_NUM_KEY = ConfigurationKeys.METRICS_CONFIGURATIONS_PREFIX + "reporting.mapper.task.attempt.num";
Expand Down Expand Up @@ -572,56 +571,22 @@ private void addJars(Path jarFileDir, String jarFileList, Configuration conf) th
for (String jarFile : SPLITTER.split(jarFileList)) {
Path srcJarFile = new Path(jarFile);
FileStatus[] fileStatusList = lfs.globStatus(srcJarFile);

for (FileStatus status : fileStatusList) {
Path destJarFile = HdfsJarUploadUtils.calculateDestJarFilePath(fs, status.getPath().getName(), this.unsharedJarsDir, jarFileDir);
// For each FileStatus there are chances it could fail in copying at the first attempt, due to file-existence
// or file-copy is ongoing by other job instance since all Gobblin jobs share the same jar file directory.
// the retryCount is to avoid cases (if any) where retry is going too far and causes job hanging.
int retryCount = 0;
boolean shouldFileBeAddedIntoDC = true;
Path destJarFile = calculateDestJarFile(status, jarFileDir);
// Adding destJarFile into HDFS until it exists and the size of file on targetPath matches the one on local path.
while (!this.fs.exists(destJarFile) || fs.getFileStatus(destJarFile).getLen() != status.getLen()) {
try {
if (this.fs.exists(destJarFile) && fs.getFileStatus(destJarFile).getLen() != status.getLen()) {
Thread.sleep(WAITING_TIME_ON_IMCOMPLETE_UPLOAD);
throw new IOException("Waiting for file to complete on uploading ... ");
}
// Set the first parameter as false for not deleting sourceFile
// Set the second parameter as false for not overwriting existing file on the target, by default it is true.
// If the file is preExisted but overwrite flag set to false, then an IOException if thrown.
this.fs.copyFromLocalFile(false, false, status.getPath(), destJarFile);
} catch (IOException | InterruptedException e) {
LOG.warn("Path:" + destJarFile + " is not copied successfully. Will require retry.");
retryCount += 1;
if (retryCount >= this.jarFileMaximumRetry) {
LOG.error("The jar file:" + destJarFile + "failed in being copied into hdfs", e);
// If retry reaches upper limit, skip copying this file.
shouldFileBeAddedIntoDC = false;
break;
}
}
}
if (shouldFileBeAddedIntoDC) {
if (HdfsJarUploadUtils.uploadJarToHdfs(this.fs, status, this.jarFileMaximumRetry, destJarFile)) {
// Then add the jar file on HDFS to the classpath
LOG.info(String.format("Adding %s to classpath", destJarFile));
DistributedCache.addFileToClassPath(destJarFile, conf, this.fs);
} else {
LOG.error("Failed to upload jar file: " + status.getPath());
}
}
}
}

/**
* Calculate the target filePath of the jar file to be copied on HDFS,
* given the {@link FileStatus} of a jarFile and the path of directory that contains jar.
*/
private Path calculateDestJarFile(FileStatus status, Path jarFileDir) {
// SNAPSHOT jars should not be shared, as different jobs may be using different versions of it
Path baseDir = status.getPath().getName().contains("SNAPSHOT") ? this.unsharedJarsDir : jarFileDir;
// DistributedCache requires absolute path, so we need to use makeQualified.
return new Path(this.fs.makeQualified(baseDir), status.getPath().getName());
}

/**
* Add local non-jar files the job depends on to DistributedCache.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,8 @@ class YarnService extends AbstractIdleService {

private volatile boolean shutdownInProgress = false;

private final boolean jarCacheEnabled;

public YarnService(Config config, String applicationName, String applicationId, YarnConfiguration yarnConfiguration,
FileSystem fs, EventBus eventBus) throws Exception {
this.applicationName = applicationName;
Expand Down Expand Up @@ -270,6 +272,7 @@ public YarnService(Config config, String applicationName, String applicationId,
GobblinYarnConfigurationKeys.DEFAULT_APP_VIEW_ACL);
this.containerTimezone = ConfigUtils.getString(this.config, GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_TIMEZONE,
GobblinYarnConfigurationKeys.DEFAULT_GOBBLIN_YARN_CONTAINER_TIMEZONE);
this.jarCacheEnabled = ConfigUtils.getBoolean(this.config, GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED_DEFAULT);
}

@SuppressWarnings("unused")
Expand Down Expand Up @@ -484,12 +487,30 @@ private void requestContainer(Optional<String> preferredNode, Resource resource)
protected ContainerLaunchContext newContainerLaunchContext(ContainerInfo containerInfo)
throws IOException {
Path appWorkDir = GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, this.fs, this.applicationName, this.applicationId);
// Used for -SNAPSHOT versions of jars
Path containerJarsUnsharedDir = new Path(appWorkDir, GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME);
Path jarCacheDir = this.jarCacheEnabled ? YarnHelixUtils.calculatePerMonthJarCachePath(this.config) : appWorkDir;
Path containerJarsCachedDir = new Path(jarCacheDir, GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME);
LOGGER.info("Container cached jars root dir: " + containerJarsCachedDir);
LOGGER.info("Container execution-private jars root dir: " + containerJarsUnsharedDir);
Path containerWorkDir = new Path(appWorkDir, GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME);

Map<String, LocalResource> resourceMap = Maps.newHashMap();

Map<String, LocalResource> resourceMap = Maps.newHashMap();
// Always fetch any jars from the appWorkDir for any potential snapshot jars
addContainerLocalResources(new Path(appWorkDir, GobblinYarnConfigurationKeys.LIB_JARS_DIR_NAME), resourceMap);
addContainerLocalResources(new Path(containerWorkDir, GobblinYarnConfigurationKeys.APP_JARS_DIR_NAME), resourceMap);
if (this.config.hasPath(GobblinYarnConfigurationKeys.CONTAINER_JARS_KEY)) {
addContainerLocalResources(new Path(containerJarsUnsharedDir, GobblinYarnConfigurationKeys.APP_JARS_DIR_NAME),
resourceMap);
}
if (this.jarCacheEnabled) {
addContainerLocalResources(new Path(jarCacheDir, GobblinYarnConfigurationKeys.LIB_JARS_DIR_NAME), resourceMap);
if (this.config.hasPath(GobblinYarnConfigurationKeys.CONTAINER_JARS_KEY)) {
addContainerLocalResources(new Path(containerJarsCachedDir, GobblinYarnConfigurationKeys.APP_JARS_DIR_NAME),
resourceMap);
}
}

addContainerLocalResources(
new Path(containerWorkDir, GobblinYarnConfigurationKeys.APP_FILES_DIR_NAME), resourceMap);

Expand Down Expand Up @@ -579,8 +600,6 @@ protected String buildContainerCommand(Container container, String helixParticip
containerCommand.append(" --").append(GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_OPTION_NAME)
.append(" ").append(helixInstanceTag);
}

LOGGER.info("Building " + containerProcessName);
return containerCommand.append(" 1>").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR).append(File.separator).append(
containerProcessName).append(".").append(ApplicationConstants.STDOUT)
.append(" 2>").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR).append(File.separator).append(
Expand Down Expand Up @@ -797,7 +816,6 @@ public void onContainersAllocated(List<Container> containers) {
public void run() {
try {
LOGGER.info("Starting container " + containerId);

nmClientAsync.startContainerAsync(container, newContainerLaunchContext(containerInfo));
} catch (IOException ioe) {
LOGGER.error("Failed to start container " + containerId, ioe);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.util.filesystem;

import java.io.IOException;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import lombok.extern.slf4j.Slf4j;


/**
* Utility class for uploading jar files to HDFS with retries to handle concurrency
*/
@Slf4j
public class HdfsJarUploadUtils {

private static final long WAITING_TIME_ON_INCOMPLETE_UPLOAD_MILLIS = 3000;

/**
* Calculate the target filePath of the jar file to be copied on HDFS,
* given the {@link FileStatus} of a jarFile and the path of directory that contains jar.
* Snapshot dirs should not be shared, as different jobs may be using different versions of it.
* @param fs
* @param jarName
* @param unsharedJarsDir
* @param jarCacheDir
* @return
* @throws IOException
*/
public static Path calculateDestJarFilePath(FileSystem fs, String jarName, Path unsharedJarsDir, Path jarCacheDir) throws IOException {
Path uploadDir = jarName.contains("SNAPSHOT") ? unsharedJarsDir : jarCacheDir;
Path destJarFile = new Path(fs.makeQualified(uploadDir), jarName);
return destJarFile;
}
/**
* Upload a jar file to HDFS with retries to handle already existing jars
* @param fs
* @param localJar
* @param destJarFile
* @param maxAttempts
* @return
* @throws IOException
*/
public static boolean uploadJarToHdfs(FileSystem fs, FileStatus localJar, int maxAttempts, Path destJarFile) throws IOException {
int retryCount = 0;
while (!fs.exists(destJarFile) || fs.getFileStatus(destJarFile).getLen() != localJar.getLen()) {
try {
if (fs.exists(destJarFile) && fs.getFileStatus(destJarFile).getLen() != localJar.getLen()) {
Thread.sleep(WAITING_TIME_ON_INCOMPLETE_UPLOAD_MILLIS);
throw new IOException("Waiting for file to complete on uploading ... ");
}
boolean deleteSourceFile = false;
boolean overwriteAnyExistingDestFile = false; // IOException will be thrown if does already exist
fs.copyFromLocalFile(deleteSourceFile, overwriteAnyExistingDestFile, localJar.getPath(), destJarFile);
} catch (IOException | InterruptedException e) {
log.warn("Path:" + destJarFile + " is not copied successfully. Will require retry.");
retryCount += 1;
if (retryCount >= maxAttempts) {
log.error("The jar file:" + destJarFile + "failed in being copied into hdfs", e);
// If retry reaches upper limit, skip copying this file.
return false;
}
}
}
return true;
}
}
Loading

0 comments on commit e995ef6

Please sign in to comment.