From 9e90236f663570281a2dc3c27cf5a00ba2be6e6e Mon Sep 17 00:00:00 2001 From: thomas Date: Mon, 15 Jul 2019 17:38:16 +0200 Subject: [PATCH 1/2] adding CombineInputFileFormat; only single use case so far --- .../calvalus/processing/ProcessorAdapter.java | 16 ++-- .../processing/analysis/QLMapper.java | 2 +- .../processing/beam/SnapGraphAdapter.java | 6 +- .../beam/SubsetProcessorAdapter.java | 2 +- .../ExecutableProcessorAdapter.java | 14 ++-- .../processing/geodb/GeodbScanMapper.java | 2 +- .../processing/hadoop/HadoopWorkflowItem.java | 27 +++--- .../processing/l2/CombineFileInputFormat.java | 83 +++++++++++++++++++ .../processing/l2/L2FormattingMapper.java | 2 +- .../bc/calvalus/processing/l2/L2Mapper.java | 4 +- .../processing/l2/ProcessingMapper.java | 8 +- .../processing/l2tol3/L2toL3Mapper.java | 6 +- .../bc/calvalus/processing/l3/L3Mapper.java | 6 +- .../processing/mosaic/MosaicMapper.java | 2 +- .../cli/CalvalusHadoopRequestConverter.java | 32 ++++++- .../production/cli/ProductionTool.java | 35 -------- 16 files changed, 168 insertions(+), 79 deletions(-) create mode 100644 calvalus-processing/src/main/java/com/bc/calvalus/processing/l2/CombineFileInputFormat.java diff --git a/calvalus-processing/src/main/java/com/bc/calvalus/processing/ProcessorAdapter.java b/calvalus-processing/src/main/java/com/bc/calvalus/processing/ProcessorAdapter.java index 960a6d374..8adf321dd 100644 --- a/calvalus-processing/src/main/java/com/bc/calvalus/processing/ProcessorAdapter.java +++ b/calvalus-processing/src/main/java/com/bc/calvalus/processing/ProcessorAdapter.java @@ -26,6 +26,7 @@ import com.vividsolutions.jts.geom.Geometry; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.MapContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; @@ -257,7 +258,7 @@ protected Path getWorkOutputDirectoryPath() throws IOException { Path appendDatePart(Path path) { if (getConfiguration().getBoolean(JobConfigNames.CALVALUS_OUTPUT_PRESERVE_DATE_TREE, false)) { - String datePart = getDatePart(getInputPath()); + String datePart = getDatePart(getInputPaths()[0]); if (datePart != null) { path = new Path(path, datePart); } @@ -329,13 +330,16 @@ public void setProcessingRectangle(Rectangle roiRectangle) { * * @return The path of the input product. */ - public Path getInputPath() { + public Path[] getInputPaths() { if (inputSplit instanceof ProductSplit) { ProductSplit productSplit = (ProductSplit) inputSplit; - return productSplit.getPath(); + return new Path[] {productSplit.getPath()}; } else if (inputSplit instanceof FileSplit) { FileSplit fileSplit = (FileSplit) inputSplit; - return fileSplit.getPath(); + return new Path[] {fileSplit.getPath()}; + } else if (inputSplit instanceof CombineFileSplit) { + CombineFileSplit fileSplit = (CombineFileSplit) inputSplit; + return fileSplit.getPaths(); } else { throw new IllegalArgumentException("input split is neither a FileSplit nor a ProductSplit"); } @@ -434,8 +438,8 @@ private Product openInputProduct() throws IOException { CalvalusProductIO.printProductOnStdout(product, "opened from local file"); return product; } else { - LOG.info(String.format("openInputProduct: inputPath = %s inputFormat = %s", getInputPath(), inputFormat)); - Product product = CalvalusProductIO.readProduct(getInputPath(), getConfiguration(), inputFormat); + LOG.info(String.format("openInputProduct: inputPath = %s inputFormat = %s", getInputPaths()[0], inputFormat)); + Product product = CalvalusProductIO.readProduct(getInputPaths()[0], getConfiguration(), inputFormat); if (inputSplit instanceof FileSplit) { FileSplit fileSplit = (FileSplit) inputSplit; diff --git a/calvalus-processing/src/main/java/com/bc/calvalus/processing/analysis/QLMapper.java b/calvalus-processing/src/main/java/com/bc/calvalus/processing/analysis/QLMapper.java index 887db1d39..a750ca6d7 100644 --- a/calvalus-processing/src/main/java/com/bc/calvalus/processing/analysis/QLMapper.java +++ b/calvalus-processing/src/main/java/com/bc/calvalus/processing/analysis/QLMapper.java @@ -57,7 +57,7 @@ public void run(Mapper.Context context) throws IOException, InterruptedException try { Product product = processorAdapter.getProcessedProduct(SubProgressMonitor.create(pm, 5)); if (product != null) { - final String inputFileName = processorAdapter.getInputPath().getName(); + final String inputFileName = processorAdapter.getInputPaths()[0].getName(); final String productName = FileUtils.getFilenameWithoutExtension(inputFileName); final Quicklooks.QLConfig[] configs = Quicklooks.get(context.getConfiguration()); for (Quicklooks.QLConfig config : configs) { diff --git a/calvalus-processing/src/main/java/com/bc/calvalus/processing/beam/SnapGraphAdapter.java b/calvalus-processing/src/main/java/com/bc/calvalus/processing/beam/SnapGraphAdapter.java index 979f70c04..1b8dfcf16 100644 --- a/calvalus-processing/src/main/java/com/bc/calvalus/processing/beam/SnapGraphAdapter.java +++ b/calvalus-processing/src/main/java/com/bc/calvalus/processing/beam/SnapGraphAdapter.java @@ -239,7 +239,7 @@ private boolean executeGraphAndCollectOutput(Graph graph, XppDom calvalusAppData private boolean postprocessTargetProduct() throws IOException { if (getConfiguration().getBoolean(JobConfigNames.CALVALUS_OUTPUT_SUBSETTING, false)) { - getLogger().info("output subsetting of split " + getInputPath()); + getLogger().info("output subsetting of split " + getInputPaths()); targetProduct = createSubsetFromOutput(targetProduct); } if (targetProduct.getSceneRasterWidth() == 0 || targetProduct.getSceneRasterHeight() == 0) { @@ -265,7 +265,7 @@ private Product getTargetProductFromGraph(Graph graph, String targetNodeId) { private GraphContext buildGraphContext(Graph graph, Header header) throws IOException, GraphException { List sources = header.getSources(); - Path inputPath = getInputPath(); + Path inputPath = getInputPaths()[0]; Path qualifiedInputPath = inputPath.getFileSystem(getConfiguration()).makeQualified(inputPath); Operator sourceProducts = new SourceProductContainerOperator(); for (HeaderSource headerSource : sources) { @@ -325,7 +325,7 @@ public void dispose() { } public Graph createGraph() throws GraphException, IOException { - Path inputPath = getInputPath(); + Path inputPath = getInputPaths()[0]; CalvalusLogger.getLogger().info("Creating graph for input: " + inputPath); Configuration conf = getConfiguration(); String processorParameters = conf.get(JobConfigNames.CALVALUS_L2_PARAMETERS); diff --git a/calvalus-processing/src/main/java/com/bc/calvalus/processing/beam/SubsetProcessorAdapter.java b/calvalus-processing/src/main/java/com/bc/calvalus/processing/beam/SubsetProcessorAdapter.java index 7613e1321..a98e06b69 100644 --- a/calvalus-processing/src/main/java/com/bc/calvalus/processing/beam/SubsetProcessorAdapter.java +++ b/calvalus-processing/src/main/java/com/bc/calvalus/processing/beam/SubsetProcessorAdapter.java @@ -127,7 +127,7 @@ protected String getOutputProductFilename() { return FileUtils.exchangeExtension(getInputParameters()[i + 1], ".seq"); } } - String inputFilename = getInputPath().getName(); + String inputFilename = getInputPaths()[0].getName(); return "L2_of_" + FileUtils.exchangeExtension(inputFilename, ".seq"); } diff --git a/calvalus-processing/src/main/java/com/bc/calvalus/processing/executable/ExecutableProcessorAdapter.java b/calvalus-processing/src/main/java/com/bc/calvalus/processing/executable/ExecutableProcessorAdapter.java index efe50dbfe..26329bb78 100644 --- a/calvalus-processing/src/main/java/com/bc/calvalus/processing/executable/ExecutableProcessorAdapter.java +++ b/calvalus-processing/src/main/java/com/bc/calvalus/processing/executable/ExecutableProcessorAdapter.java @@ -20,8 +20,6 @@ import com.bc.calvalus.processing.ProcessorAdapter; import com.bc.calvalus.processing.beam.CalvalusProductIO; import com.bc.calvalus.processing.beam.LandsatCalvalusReaderPlugin; -import com.bc.calvalus.processing.beam.PathConfiguration; -import com.bc.calvalus.processing.beam.Sentinel2CalvalusReaderPlugin; import com.bc.calvalus.processing.beam.SnapGraphAdapter; import com.bc.calvalus.processing.l2.ProductFormatter; import com.bc.calvalus.processing.utils.ProductTransformation; @@ -107,7 +105,7 @@ public void prepareProcessing() throws IOException { velocityContext.put("parameterText", processorParameters); velocityContext.put("parameters", PropertiesHandler.asProperties(processorParameters)); - Path inputPath = getInputPath(); + Path inputPath = getInputPaths()[0]; Path outputPath = getOutputDirectoryPath(); velocityContext.put("inputPath", inputPath); velocityContext.put("outputPath", outputPath); @@ -145,11 +143,13 @@ public boolean canSkipInputProduct() throws IOException { @Override public boolean processSourceProduct(MODE mode, ProgressMonitor pm) throws IOException { - Path inputPath = getInputPath(); + Path[] inputPaths = getInputPaths(); File inputFile = getInputFile(); - if (inputFile == null) { + for (Path inputPath : inputPaths) { inputFile = CalvalusProductIO.copyFileToLocal(inputPath, getConfiguration()); - setInputFile(inputFile); + if (inputFile == null) { + setInputFile(inputFile); + } } if (getMapContext().getInputSplit() instanceof FileSplit) { FileSplit fileSplit = (FileSplit) getMapContext().getInputSplit(); @@ -162,7 +162,7 @@ public boolean processSourceProduct(MODE mode, ProgressMonitor pm) throws IOExce productRect = new Rectangle(inputProduct.getSceneRasterWidth(), inputProduct.getSceneRasterHeight()); } - outputFilesNames = processInput(pm, inputRectangle, inputPath, inputFile, productRect, null); + outputFilesNames = processInput(pm, inputRectangle, inputPaths[0], inputFile, productRect, null); return outputFilesNames.length > 0; } diff --git a/calvalus-processing/src/main/java/com/bc/calvalus/processing/geodb/GeodbScanMapper.java b/calvalus-processing/src/main/java/com/bc/calvalus/processing/geodb/GeodbScanMapper.java index c034536e2..e5c7eec48 100644 --- a/calvalus-processing/src/main/java/com/bc/calvalus/processing/geodb/GeodbScanMapper.java +++ b/calvalus-processing/src/main/java/com/bc/calvalus/processing/geodb/GeodbScanMapper.java @@ -88,7 +88,7 @@ public void run(Context context) throws IOException, InterruptedException { if (endUTC != null) { endTime = dateFormat.format(endUTC.getAsDate()); } - String dbPath = getDBPath(processorAdapter.getInputPath(), context.getConfiguration()); + String dbPath = getDBPath(processorAdapter.getInputPaths()[0], context.getConfiguration()); String result = startTime + "\t" + endTime + "\t" + wkt; context.write(new Text(dbPath), new Text(result)); diff --git a/calvalus-processing/src/main/java/com/bc/calvalus/processing/hadoop/HadoopWorkflowItem.java b/calvalus-processing/src/main/java/com/bc/calvalus/processing/hadoop/HadoopWorkflowItem.java index 8aa9dd408..0d3f5e3a2 100644 --- a/calvalus-processing/src/main/java/com/bc/calvalus/processing/hadoop/HadoopWorkflowItem.java +++ b/calvalus-processing/src/main/java/com/bc/calvalus/processing/hadoop/HadoopWorkflowItem.java @@ -16,10 +16,6 @@ package com.bc.calvalus.processing.hadoop; -import static com.bc.calvalus.processing.hadoop.HadoopProcessingService.CALVALUS_SOFTWARE_PATH; -import static com.bc.calvalus.processing.hadoop.HadoopProcessingService.DEFAULT_CALVALUS_BUNDLE; -import static com.bc.calvalus.processing.hadoop.HadoopProcessingService.DEFAULT_SNAP_BUNDLE; - import com.bc.calvalus.commons.AbstractWorkflowItem; import com.bc.calvalus.commons.CalvalusLogger; import com.bc.calvalus.commons.ProcessState; @@ -46,6 +42,10 @@ import java.util.function.Consumer; import java.util.logging.Level; +import static com.bc.calvalus.processing.hadoop.HadoopProcessingService.CALVALUS_SOFTWARE_PATH; +import static com.bc.calvalus.processing.hadoop.HadoopProcessingService.DEFAULT_CALVALUS_BUNDLE; +import static com.bc.calvalus.processing.hadoop.HadoopProcessingService.DEFAULT_SNAP_BUNDLE; + /** * A workflow item that corresponds to a single Hadoop job. */ @@ -213,7 +213,7 @@ public void submit() throws WorkflowException { CalvalusLogger.getLogger().info("Submitted Job with Id: " + jobId); CalvalusLogger.getLogger().info("-------------------------------"); CalvalusLogger.getLogger().info("remoteUser=" + remoteUser.getShortUserName() - + " mapreduce.job.user.name=" + job.getConfiguration().get("mapreduce.job.user.name")); + + " mapreduce.job.user.name=" + job.getConfiguration().get("mapreduce.job.user.name")); HashMap calvalusConfMap = new HashMap<>(); for (Map.Entry keyValue : job.getConfiguration()) { if (keyValue.getKey().startsWith("calvalus")) { @@ -230,7 +230,7 @@ public void accept(Map.Entry keyValue) { } }); CalvalusLogger.getLogger().info("-------------------------------"); - + setJobId(jobId); return job; }); @@ -287,13 +287,20 @@ protected JobID submitJob(Job job) throws IOException { protected Class getInputFormatClass(Configuration conf) throws IOException { if (conf.get(JobConfigNames.CALVALUS_INPUT_TABLE) != null) { return TableInputFormat.class; - } else if (conf.get(JobConfigNames.CALVALUS_INPUT_GEO_INVENTORY) != null || conf.get(JobConfigNames.CALVALUS_INPUT_PATH_PATTERNS) != null) { + } else if (conf.get(JobConfigNames.CALVALUS_INPUT_GEO_INVENTORY) != null || conf.get(JobConfigNames.CALVALUS_INPUT_PATH_PATTERNS) != null && conf.get("mapreduce.job.inputformat.class") == null) { return PatternBasedInputFormat.class; + } else if (conf.get("mapreduce.job.inputformat.class") != null) { + String classname = conf.get("mapreduce.job.inputformat.class"); + try { + return (Class) Class.forName(classname); + } catch (ClassNotFoundException e) { + throw new IOException("Unable to create class '" + classname + "'", e); + } } else { throw new IOException(String.format("Missing job parameter for inputFormat. Neither %s nor %s nor %s had been set.", - JobConfigNames.CALVALUS_INPUT_PATH_PATTERNS, - JobConfigNames.CALVALUS_INPUT_TABLE, - JobConfigNames.CALVALUS_INPUT_GEO_INVENTORY)); + JobConfigNames.CALVALUS_INPUT_PATH_PATTERNS, + JobConfigNames.CALVALUS_INPUT_TABLE, + JobConfigNames.CALVALUS_INPUT_GEO_INVENTORY)); } } diff --git a/calvalus-processing/src/main/java/com/bc/calvalus/processing/l2/CombineFileInputFormat.java b/calvalus-processing/src/main/java/com/bc/calvalus/processing/l2/CombineFileInputFormat.java new file mode 100644 index 000000000..ecbcebe8a --- /dev/null +++ b/calvalus-processing/src/main/java/com/bc/calvalus/processing/l2/CombineFileInputFormat.java @@ -0,0 +1,83 @@ +package com.bc.calvalus.processing.l2; + +import com.bc.calvalus.JobClientsMap; +import com.bc.calvalus.commons.CalvalusLogger; +import com.bc.calvalus.commons.InputPathResolver; +import com.bc.calvalus.inventory.hadoop.FileSystemPathIterator; +import com.bc.calvalus.inventory.hadoop.HdfsFileSystemService; +import com.bc.calvalus.processing.JobConfigNames; +import com.bc.calvalus.processing.geodb.GeodbScanMapper; +import com.bc.calvalus.processing.hadoop.NoRecordReader; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +/** + * @author thomas + */ +public class CombineFileInputFormat extends InputFormat { + + /** + * Creates a single split from a given pattern + */ + @Override + public List getSplits(JobContext context) throws IOException { + Configuration conf = context.getConfiguration(); + String inputPathPattern = conf.get(JobConfigNames.CALVALUS_INPUT_PATH_PATTERNS); + + List splits = new ArrayList<>(1); + JobClientsMap jobClientsMap = new JobClientsMap(new JobConf(conf)); + HdfsFileSystemService hdfsFileSystemService = new HdfsFileSystemService(jobClientsMap); + List inputPatterns = new InputPathResolver().resolve(inputPathPattern); + RemoteIterator fileStatusIt = getFileStatuses(hdfsFileSystemService, inputPatterns, conf, null); + addSplit(fileStatusIt, splits); + CalvalusLogger.getLogger().info(String.format("Created %d split(s).", splits.size())); + return splits; + } + + private void addSplit(RemoteIterator fileStatuses, List splits) throws IOException { + List filePaths = new ArrayList<>(); + List fileLengths = new ArrayList<>(); + while (fileStatuses.hasNext()) { + LocatedFileStatus fileStatus = fileStatuses.next(); + Path path = fileStatus.getPath(); + filePaths.add(path); + fileLengths.add(fileStatus.getLen()); + } + CombineFileSplit combineFileSplit = new CombineFileSplit(filePaths.toArray(new Path[filePaths.size()]), + fileLengths.stream().mapToLong(Long::longValue).toArray()); + splits.add(combineFileSplit); + } + + + protected RemoteIterator getFileStatuses(HdfsFileSystemService fileSystemService, + List inputPatterns, + Configuration conf, + Set existingPathes) throws IOException { + FileSystemPathIterator.FileStatusFilter extraFilter = null; + if (existingPathes != null && existingPathes.size() > 0) { + extraFilter = fileStatus -> { + String dbPath = GeodbScanMapper.getDBPath(fileStatus.getPath(), conf); + return !existingPathes.contains(dbPath); + }; + } + return fileSystemService.globFileStatusIterator(inputPatterns, conf, extraFilter); + } + + public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) { + return new NoRecordReader(); + } +} diff --git a/calvalus-processing/src/main/java/com/bc/calvalus/processing/l2/L2FormattingMapper.java b/calvalus-processing/src/main/java/com/bc/calvalus/processing/l2/L2FormattingMapper.java index ac3314873..ec0442ea3 100644 --- a/calvalus-processing/src/main/java/com/bc/calvalus/processing/l2/L2FormattingMapper.java +++ b/calvalus-processing/src/main/java/com/bc/calvalus/processing/l2/L2FormattingMapper.java @@ -65,7 +65,7 @@ public void run(Mapper.Context context) throws IOException, InterruptedException pm.beginTask("Level 2 format", 100 + 20); try { Configuration jobConfig = context.getConfiguration(); - Path inputPath = processorAdapter.getInputPath(); + Path inputPath = processorAdapter.getInputPaths()[0]; String productName = getProductName(jobConfig, inputPath.getName()); String format = jobConfig.get(JobConfigNames.CALVALUS_OUTPUT_FORMAT); diff --git a/calvalus-processing/src/main/java/com/bc/calvalus/processing/l2/L2Mapper.java b/calvalus-processing/src/main/java/com/bc/calvalus/processing/l2/L2Mapper.java index 8829cf3fb..288c72d1e 100644 --- a/calvalus-processing/src/main/java/com/bc/calvalus/processing/l2/L2Mapper.java +++ b/calvalus-processing/src/main/java/com/bc/calvalus/processing/l2/L2Mapper.java @@ -86,7 +86,7 @@ public void run(Context context) throws IOException, InterruptedException { Configuration jobConfig = context.getConfiguration(); ProcessorAdapter processorAdapter = ProcessorFactory.createAdapter(context); ProgressMonitor pm = new ProgressSplitProgressMonitor(context); - LOG.info("processing input " + processorAdapter.getInputPath() + " ..."); + LOG.info("processing input " + processorAdapter.getInputPaths() + " ..."); final int progressForProcessing = processorAdapter.supportsPullProcessing() ? 5 : 95; final int progressForSaving = processorAdapter.supportsPullProcessing() ? 95 : 5; pm.beginTask("Level 2 processing", progressForProcessing + progressForSaving); @@ -114,7 +114,7 @@ public void run(Context context) throws IOException, InterruptedException { if (jobConfig.get(JobConfigNames.CALVALUS_METADATA_TEMPLATE) != null) { processMetadata(context, - processorAdapter.getInputPath().toString(), + processorAdapter.getInputPaths().toString(), processorAdapter.getInputProduct(), processorAdapter.getOutputProductPath().toString(), processorAdapter.openProcessedProduct()); diff --git a/calvalus-processing/src/main/java/com/bc/calvalus/processing/l2/ProcessingMapper.java b/calvalus-processing/src/main/java/com/bc/calvalus/processing/l2/ProcessingMapper.java index cb03deb28..7fc2cf306 100644 --- a/calvalus-processing/src/main/java/com/bc/calvalus/processing/l2/ProcessingMapper.java +++ b/calvalus-processing/src/main/java/com/bc/calvalus/processing/l2/ProcessingMapper.java @@ -182,7 +182,7 @@ public void run(Context context) throws IOException, InterruptedException { final String outputCompression = jobConfig.get(JobConfigNames.OUTPUT_COMPRESSION); final ProcessorAdapter processorAdapter = ProcessorFactory.createAdapter(context); - String inputName = processorAdapter.getInputPath().getName(); + String inputName = processorAdapter.getInputPaths()[0].getName(); String productName = null; if (processorAdapter.getInputParameters() != null) { for (int i = 0; i < processorAdapter.getInputParameters().length; i += 2) { @@ -195,7 +195,7 @@ public void run(Context context) throws IOException, InterruptedException { if (! "MTD_MSIL1C.xml".equals(inputName)) { // TODO productName = getProductName(jobConfig, inputName); } else { - productName = getProductName(jobConfig, processorAdapter.getInputPath().getParent().getName()); + productName = getProductName(jobConfig, processorAdapter.getInputPaths()[0].getParent().getName()); } } final ProductFormatter productFormatter = outputFormat != null ? new ProductFormatter(productName, outputFormat, outputCompression) : null; @@ -204,7 +204,7 @@ public void run(Context context) throws IOException, InterruptedException { final int progressForProcessing = processorAdapter.supportsPullProcessing() ? 5 : 95; final int progressForSaving = processorAdapter.supportsPullProcessing() ? 95 : 5; - LOG.info("processing input " + processorAdapter.getInputPath() + " ..."); + LOG.info("processing input " + processorAdapter.getInputPaths() + " ..."); pm.beginTask("Level 2 processing", progressForProcessing + progressForSaving); try { @@ -244,7 +244,7 @@ public void run(Context context) throws IOException, InterruptedException { if (jobConfig.get(JobConfigNames.METADATA_TEMPLATE) != null) { context.setStatus("Metadata"); processMetadata(context, - processorAdapter.getInputPath().toString(), + processorAdapter.getInputPaths().toString(), processorAdapter.getInputProduct(), processorAdapter.getOutputProductPath().toString(), targetProduct); diff --git a/calvalus-processing/src/main/java/com/bc/calvalus/processing/l2tol3/L2toL3Mapper.java b/calvalus-processing/src/main/java/com/bc/calvalus/processing/l2tol3/L2toL3Mapper.java index 2eb629d06..191e7a5a5 100644 --- a/calvalus-processing/src/main/java/com/bc/calvalus/processing/l2tol3/L2toL3Mapper.java +++ b/calvalus-processing/src/main/java/com/bc/calvalus/processing/l2tol3/L2toL3Mapper.java @@ -98,7 +98,7 @@ public void run(Context context) throws IOException, InterruptedException { L2toL3SpatialBinner spatialBinner = null; final ProcessorAdapter processorAdapter = ProcessorFactory.createAdapter(context); - LOG.info("processing input " + processorAdapter.getInputPath() + " ..."); + LOG.info("processing input " + processorAdapter.getInputPaths() + " ..."); ProgressMonitor pm = new ProgressSplitProgressMonitor(context); final int progressForProcessing = processorAdapter.supportsPullProcessing() ? 5 : 90; final int progressForBinning = processorAdapter.supportsPullProcessing() ? 90 : 20; @@ -144,14 +144,14 @@ public void run(Context context) throws IOException, InterruptedException { if (spatialBinner != null) { final Exception[] exceptions = spatialBinner.getExceptions(); for (Exception exception : exceptions) { - String m = MessageFormat.format("Failed to process input slice of {0}", processorAdapter.getInputPath()); + String m = MessageFormat.format("Failed to process input slice of {0}", processorAdapter.getInputPaths()); LOG.log(Level.SEVERE, m, exception); } } // write final log entry for runtime measurements LOG.info(MessageFormat.format( "Finishes processing of {1} after {2} sec ({3} observations seen, {4} bins produced)", - context.getTaskAttemptID(), processorAdapter.getInputPath(), + context.getTaskAttemptID(), processorAdapter.getInputPaths(), spatialBinEmitter.numObsTotal, spatialBinEmitter.numBinsTotal)); } diff --git a/calvalus-processing/src/main/java/com/bc/calvalus/processing/l3/L3Mapper.java b/calvalus-processing/src/main/java/com/bc/calvalus/processing/l3/L3Mapper.java index b8a686e5e..126a3d447 100644 --- a/calvalus-processing/src/main/java/com/bc/calvalus/processing/l3/L3Mapper.java +++ b/calvalus-processing/src/main/java/com/bc/calvalus/processing/l3/L3Mapper.java @@ -72,7 +72,7 @@ public void run(Context context) throws IOException, InterruptedException { final SpatialBinEmitter spatialBinEmitter = new SpatialBinEmitter(context); final SpatialBinner spatialBinner = new SpatialBinner(binningContext, spatialBinEmitter); final ProcessorAdapter processorAdapter = ProcessorFactory.createAdapter(context); - LOG.info("processing input " + processorAdapter.getInputPath() + " ..."); + LOG.info("processing input " + processorAdapter.getInputPaths() + " ..."); ProgressMonitor pm = new ProgressSplitProgressMonitor(context); final int progressForProcessing = processorAdapter.supportsPullProcessing() ? 5 : 90; final int progressForBinning = processorAdapter.supportsPullProcessing() ? 90 : 20; @@ -108,11 +108,11 @@ public void run(Context context) throws IOException, InterruptedException { final Exception[] exceptions = spatialBinner.getExceptions(); for (Exception exception : exceptions) { - String m = MessageFormat.format("Failed to process input slice of {0}", processorAdapter.getInputPath()); + String m = MessageFormat.format("Failed to process input slice of {0}", processorAdapter.getInputPaths()); LOG.log(Level.SEVERE, m, exception); } LOG.info(MessageFormat.format("Finishes processing of {0} ({1} observations seen, {2} bins produced)", - processorAdapter.getInputPath(), + processorAdapter.getInputPaths(), spatialBinEmitter.numObsTotal, spatialBinEmitter.numBinsTotal)); } diff --git a/calvalus-processing/src/main/java/com/bc/calvalus/processing/mosaic/MosaicMapper.java b/calvalus-processing/src/main/java/com/bc/calvalus/processing/mosaic/MosaicMapper.java index 2f7cf5000..a519ff2fe 100644 --- a/calvalus-processing/src/main/java/com/bc/calvalus/processing/mosaic/MosaicMapper.java +++ b/calvalus-processing/src/main/java/com/bc/calvalus/processing/mosaic/MosaicMapper.java @@ -96,7 +96,7 @@ public void run(Context context) throws IOException, InterruptedException { LOG.info("Product not used"); } LOG.info(MessageFormat.format("{0} stops processing of {1} ({2} tiles produced)", - context.getTaskAttemptID(), processorAdapter.getInputPath(), numTilesProcessed)); + context.getTaskAttemptID(), processorAdapter.getInputPaths(), numTilesProcessed)); } finally { pm.done(); processorAdapter.dispose(); diff --git a/calvalus-production/src/main/java/com/bc/calvalus/production/cli/CalvalusHadoopRequestConverter.java b/calvalus-production/src/main/java/com/bc/calvalus/production/cli/CalvalusHadoopRequestConverter.java index 115a7a428..ae5bbff8f 100644 --- a/calvalus-production/src/main/java/com/bc/calvalus/production/cli/CalvalusHadoopRequestConverter.java +++ b/calvalus-production/src/main/java/com/bc/calvalus/production/cli/CalvalusHadoopRequestConverter.java @@ -95,6 +95,12 @@ public JobConf createJob(String requestPath, Map commandLinePara } LOG.info("reading .calvalus configuration with " + count + " parameters"); + for (Map.Entry hadoopParameter : hadoopParameters) { + LOG.info(hadoopParameter.getKey() + ": " + hadoopParameter.getValue()); + } + + LOG.info("############################"); + // add parameters of production type, maybe translate and apply function count = 0; for (Map.Entry entry : productionTypeDef.entrySet()) { @@ -103,20 +109,38 @@ public JobConf createJob(String requestPath, Map commandLinePara ++count; } } - LOG.info("reading production type definition from " + "etc/" + productionType + "-cht-type.json with " + LOG.info("read production type definition from " + "etc/" + productionType + "-cht-type.json with " + count + " parameters and " + (productionTypeDef.size() - count) + " rules"); + for (Map.Entry hadoopParameter : hadoopParameters) { + LOG.info(hadoopParameter.getKey() + ": " + hadoopParameter.getValue()); + } + + LOG.info("############################"); + // add parameters of request, maybe translate and apply function for (Map.Entry entry : request.entrySet()) { translateAndInsert(entry.getKey(), String.valueOf(entry.getValue()), productionTypeDef, hadoopParameters); } + for (Map.Entry hadoopParameter : hadoopParameters) { + LOG.info(hadoopParameter.getKey() + ": " + hadoopParameter.getValue()); + } + + LOG.info("############################"); + // add parameters of command line, maybe translate and apply function for (Map.Entry entry : commandLineParameters.entrySet()) { translateAndInsert(entry.getKey(), entry.getValue(), productionTypeDef, hadoopParameters); } LOG.fine("adding " + commandLineParameters.size() + " command line parameters"); + for (Map.Entry hadoopParameter : hadoopParameters) { + LOG.info(hadoopParameter.getKey() + ": " + hadoopParameter.getValue()); + } + + LOG.info("############################"); + // create job client for user and for access to file system hadoopConnection.createJobClient(hadoopParameters); @@ -137,6 +161,12 @@ public JobConf createJob(String requestPath, Map commandLinePara translateAndInsert(entry.getKey(), entry.getValue(), productionTypeDef, hadoopParameters); } + for (Map.Entry hadoopParameter : hadoopParameters) { + LOG.info(hadoopParameter.getKey() + ": " + hadoopParameter.getValue()); + } + + LOG.info("############################"); + // install processor bundles and calvalus and snap bundle JobConf jobConf = new JobConf(hadoopParameters); hadoopConnection.installProcessorBundles(userName, jobConf); diff --git a/calvalus-production/src/main/java/com/bc/calvalus/production/cli/ProductionTool.java b/calvalus-production/src/main/java/com/bc/calvalus/production/cli/ProductionTool.java index fb160b9d7..69a528627 100644 --- a/calvalus-production/src/main/java/com/bc/calvalus/production/cli/ProductionTool.java +++ b/calvalus-production/src/main/java/com/bc/calvalus/production/cli/ProductionTool.java @@ -34,63 +34,28 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobID; import org.apache.hadoop.mapred.RunningJob; -import org.apache.xml.security.Init; -import org.apache.xml.security.encryption.XMLCipher; import org.apache.xml.security.encryption.XMLEncryptionException; -import org.apache.xml.security.utils.EncryptionConstants; import org.bouncycastle.jce.provider.BouncyCastleProvider; -import org.bouncycastle.util.io.pem.PemObject; -import org.bouncycastle.util.io.pem.PemReader; import org.esa.snap.core.util.StringUtils; import org.jdom.JDOMException; -import org.jdom2.Element; -import org.jdom2.input.DOMBuilder; -import org.jdom2.output.DOMOutputter; -import org.w3c.dom.Document; import org.xml.sax.SAXException; -import javax.crypto.Cipher; -import javax.xml.parsers.DocumentBuilder; -import javax.xml.parsers.DocumentBuilderFactory; import javax.xml.parsers.ParserConfigurationException; -import javax.xml.transform.Transformer; -import javax.xml.transform.TransformerException; -import javax.xml.transform.TransformerFactory; -import javax.xml.transform.dom.DOMSource; -import javax.xml.transform.stream.StreamResult; -import java.io.BufferedReader; -import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; import java.io.File; import java.io.FileOutputStream; import java.io.FileReader; import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; import java.io.OutputStream; -import java.io.StringWriter; -import java.net.HttpURLConnection; -import java.net.URL; -import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Paths; import java.nio.file.StandardOpenOption; import java.security.GeneralSecurityException; -import java.security.KeyFactory; -import java.security.NoSuchAlgorithmException; -import java.security.NoSuchProviderException; -import java.security.PrivateKey; import java.security.Security; -import java.security.spec.InvalidKeySpecException; -import java.security.spec.KeySpec; -import java.security.spec.PKCS8EncodedKeySpec; import java.text.SimpleDateFormat; -import java.util.Base64; import java.util.Date; import java.util.List; import java.util.Map; -import java.util.TimeZone; import static com.bc.calvalus.production.ProcessingLogHandler.LOG_STREAM_EMPTY_ERROR_CODE; From b933f3402ec5fee0f36f726d5b6b7b4394280086 Mon Sep 17 00:00:00 2001 From: thomas Date: Mon, 15 Jul 2019 17:48:33 +0200 Subject: [PATCH 2/2] removed debug outputs --- .../cli/CalvalusHadoopRequestConverter.java | 30 ------------------- 1 file changed, 30 deletions(-) diff --git a/calvalus-production/src/main/java/com/bc/calvalus/production/cli/CalvalusHadoopRequestConverter.java b/calvalus-production/src/main/java/com/bc/calvalus/production/cli/CalvalusHadoopRequestConverter.java index ae5bbff8f..b9145e833 100644 --- a/calvalus-production/src/main/java/com/bc/calvalus/production/cli/CalvalusHadoopRequestConverter.java +++ b/calvalus-production/src/main/java/com/bc/calvalus/production/cli/CalvalusHadoopRequestConverter.java @@ -95,12 +95,6 @@ public JobConf createJob(String requestPath, Map commandLinePara } LOG.info("reading .calvalus configuration with " + count + " parameters"); - for (Map.Entry hadoopParameter : hadoopParameters) { - LOG.info(hadoopParameter.getKey() + ": " + hadoopParameter.getValue()); - } - - LOG.info("############################"); - // add parameters of production type, maybe translate and apply function count = 0; for (Map.Entry entry : productionTypeDef.entrySet()) { @@ -112,35 +106,17 @@ public JobConf createJob(String requestPath, Map commandLinePara LOG.info("read production type definition from " + "etc/" + productionType + "-cht-type.json with " + count + " parameters and " + (productionTypeDef.size() - count) + " rules"); - for (Map.Entry hadoopParameter : hadoopParameters) { - LOG.info(hadoopParameter.getKey() + ": " + hadoopParameter.getValue()); - } - - LOG.info("############################"); - // add parameters of request, maybe translate and apply function for (Map.Entry entry : request.entrySet()) { translateAndInsert(entry.getKey(), String.valueOf(entry.getValue()), productionTypeDef, hadoopParameters); } - for (Map.Entry hadoopParameter : hadoopParameters) { - LOG.info(hadoopParameter.getKey() + ": " + hadoopParameter.getValue()); - } - - LOG.info("############################"); - // add parameters of command line, maybe translate and apply function for (Map.Entry entry : commandLineParameters.entrySet()) { translateAndInsert(entry.getKey(), entry.getValue(), productionTypeDef, hadoopParameters); } LOG.fine("adding " + commandLineParameters.size() + " command line parameters"); - for (Map.Entry hadoopParameter : hadoopParameters) { - LOG.info(hadoopParameter.getKey() + ": " + hadoopParameter.getValue()); - } - - LOG.info("############################"); - // create job client for user and for access to file system hadoopConnection.createJobClient(hadoopParameters); @@ -161,12 +137,6 @@ public JobConf createJob(String requestPath, Map commandLinePara translateAndInsert(entry.getKey(), entry.getValue(), productionTypeDef, hadoopParameters); } - for (Map.Entry hadoopParameter : hadoopParameters) { - LOG.info(hadoopParameter.getKey() + ": " + hadoopParameter.getValue()); - } - - LOG.info("############################"); - // install processor bundles and calvalus and snap bundle JobConf jobConf = new JobConf(hadoopParameters); hadoopConnection.installProcessorBundles(userName, jobConf);