diff --git a/mpas_analysis/ocean/plot_depth_integrated_time_series_subtask.py b/mpas_analysis/ocean/plot_depth_integrated_time_series_subtask.py index bd54eb1d1..d060c28c8 100644 --- a/mpas_analysis/ocean/plot_depth_integrated_time_series_subtask.py +++ b/mpas_analysis/ocean/plot_depth_integrated_time_series_subtask.py @@ -18,17 +18,18 @@ from mpas_analysis.shared.plot.plotting import timeseries_analysis_plot -from mpas_analysis.shared.generalized_reader import open_multifile_dataset from mpas_analysis.shared.io import open_mpas_dataset, write_netcdf from mpas_analysis.shared.timekeeping.utility import date_to_days, \ days_to_datetime -from mpas_analysis.shared.io.utility import build_config_full_path +from mpas_analysis.shared.io.utility import build_config_full_path, \ + make_directories from mpas_analysis.shared.html import write_image_xml -from mpas_analysis.shared.time_series import compute_moving_avg +from mpas_analysis.shared.time_series import compute_moving_avg, \ + combine_time_series_with_ncrcat class PlotDepthIntegratedTimeSeriesSubtask(AnalysisTask): @@ -223,7 +224,12 @@ def setup_and_check(self): # {{{ baseDirectory = build_config_full_path( config, 'output', 'timeSeriesSubdirectory') - self.preprocessedFileName = '{}/preprocessed_{}'.format( + make_directories('{}/preprocessed'.format(baseDirectory)) + + self.preprocessedIntermediateFileName = \ + '{}/preprocessed/intermediate_{}'.format(baseDirectory, + self.inFileName) + self.preprocessedFileName = '{}/preprocessed/{}'.format( baseDirectory, self.inFileName) if not os.path.isabs(self.inFileName): @@ -337,18 +343,20 @@ def run_task(self): # {{{ preprocessedInputDirectory = config.get( 'oceanPreprocessedReference', 'baseDirectory') - self.logger.info(' Load in OHC from preprocessed reference ' - 'run...') + self.logger.info(' Load in preprocessed reference data...') preprocessedFilePrefix = config.get(self.sectionName, 'preprocessedFilePrefix') inFilesPreprocessed = '{}/{}.{}.year*.nc'.format( preprocessedInputDirectory, preprocessedFilePrefix, preprocessedReferenceRunName) - dsPreprocessed = open_multifile_dataset( - fileNames=inFilesPreprocessed, - calendar=calendar, - config=config, - timeVariableName='xtime') + + combine_time_series_with_ncrcat( + inFilesPreprocessed, self.preprocessedIntermediateFileName, + logger=self.logger) + dsPreprocessed = open_mpas_dataset( + fileName=self.preprocessedIntermediateFileName, + calendar=calendar, + timeVariableNames='xtime') yearStart = days_to_datetime(ds.Time.min(), calendar=calendar).year yearEnd = days_to_datetime(ds.Time.max(), calendar=calendar).year @@ -378,7 +386,7 @@ def run_task(self): # {{{ if preprocessedReferenceRunName != 'None': color = 'purple' title = '{} \n {} (purple)'.format(title, - preprocessedReferenceRunName) + preprocessedReferenceRunName) preprocessedFieldPrefix = config.get(self.sectionName, 'preprocessedFieldPrefix') @@ -389,7 +397,7 @@ def run_task(self): # {{{ suffixes = ['tot'] + ['{}m'.format(depth) for depth in divisionDepths] + ['btm'] - # these preprocessed data are OHC *anomalies* + # these preprocessed data are already anomalies dsPreprocessed = compute_moving_avg(dsPreprocessed, movingAveragePoints) for rangeIndex in range(len(suffixes)): diff --git a/mpas_analysis/ocean/time_series_sst.py b/mpas_analysis/ocean/time_series_sst.py index 61e15e2c7..2afad9bf1 100644 --- a/mpas_analysis/ocean/time_series_sst.py +++ b/mpas_analysis/ocean/time_series_sst.py @@ -13,7 +13,7 @@ from mpas_analysis.shared.plot.plotting import timeseries_analysis_plot -from mpas_analysis.shared.generalized_reader import open_multifile_dataset +from mpas_analysis.shared.time_series import combine_time_series_with_ncrcat from mpas_analysis.shared.io import open_mpas_dataset from mpas_analysis.shared.timekeeping.utility import date_to_days, \ @@ -200,11 +200,16 @@ def run_task(self): # {{{ 'run...') inFilesPreprocessed = '{}/SST.{}.year*.nc'.format( preprocessedInputDirectory, preprocessedReferenceRunName) - dsPreprocessed = open_multifile_dataset( - fileNames=inFilesPreprocessed, - calendar=calendar, - config=config, - timeVariableName='xtime') + + outFolder = '{}/preprocessed'.format(outputDirectory) + make_directories(outFolder) + outFileName = '{}/sst.nc'.format(outFolder) + + combine_time_series_with_ncrcat(inFilesPreprocessed, + outFileName, logger=self.logger) + dsPreprocessed = open_mpas_dataset(fileName=outFileName, + calendar=calendar, + timeVariableNames='xtime') yearEndPreprocessed = days_to_datetime(dsPreprocessed.Time.max(), calendar=calendar).year if yearStart <= yearEndPreprocessed: diff --git a/mpas_analysis/sea_ice/time_series.py b/mpas_analysis/sea_ice/time_series.py index 4980502f8..ec84f8528 100644 --- a/mpas_analysis/sea_ice/time_series.py +++ b/mpas_analysis/sea_ice/time_series.py @@ -24,7 +24,7 @@ from mpas_analysis.shared.timekeeping.MpasRelativeDelta import \ MpasRelativeDelta -from mpas_analysis.shared.generalized_reader import open_multifile_dataset +from mpas_analysis.shared.time_series import combine_time_series_with_ncrcat from mpas_analysis.shared.io import open_mpas_dataset, write_netcdf from mpas_analysis.shared.mpas_xarray.mpas_xarray import subset_variables @@ -261,13 +261,20 @@ def run_task(self): # {{{ calendar=calendar) if preprocessedReferenceRunName != 'None': + # determine if we're beyond the end of the preprocessed data + # (and go ahead and cache the data set while we're checking) + outFolder = '{}/preprocessed'.format(outputDirectory) + make_directories(outFolder) inFilesPreprocessed = '{}/icevol.{}.year*.nc'.format( preprocessedReferenceDirectory, preprocessedReferenceRunName) - dsPreprocessed = open_multifile_dataset( - fileNames=inFilesPreprocessed, - calendar=calendar, - config=config, - timeVariableName='xtime') + outFileName = '{}/iceVolume.nc'.format(outFolder) + + combine_time_series_with_ncrcat(inFilesPreprocessed, + outFileName, + logger=self.logger) + dsPreprocessed = open_mpas_dataset(fileName=outFileName, + calendar=calendar, + timeVariableNames='xtime') preprocessedYearEnd = days_to_datetime(dsPreprocessed.Time.max(), calendar=calendar).year if yearStart <= preprocessedYearEnd: @@ -349,22 +356,31 @@ def run_task(self): # {{{ key = (hemisphere, variableName) if compareWithObservations: - dsObs = open_multifile_dataset( - fileNames=obsFileNames['iceArea'][hemisphere], - calendar=calendar, - config=config, - timeVariableName='xtime') + + outFolder = '{}/obs'.format(outputDirectory) + make_directories(outFolder) + outFileName = '{}/iceArea{}.nc'.format(outFolder, hemisphere) + + combine_time_series_with_ncrcat( + obsFileNames['iceArea'][hemisphere], + outFileName, logger=self.logger) + dsObs = open_mpas_dataset(fileName=outFileName, + calendar=calendar, + timeVariableNames='xtime') key = (hemisphere, 'iceArea') obs[key] = self._replicate_cycle(plotVars[key], dsObs.IceArea, calendar) key = (hemisphere, 'iceVolume') if hemisphere == 'NH': - dsObs = open_multifile_dataset( - fileNames=obsFileNames['iceVolume'][hemisphere], - calendar=calendar, - config=config, - timeVariableName='xtime') + outFileName = '{}/iceVolume{}.nc'.format(outFolder, + hemisphere) + combine_time_series_with_ncrcat( + obsFileNames['iceVolume'][hemisphere], + outFileName, logger=self.logger) + dsObs = open_mpas_dataset(fileName=outFileName, + calendar=calendar, + timeVariableNames='xtime') obs[key] = self._replicate_cycle(plotVars[key], dsObs.IceVol, calendar) @@ -372,14 +388,19 @@ def run_task(self): # {{{ obs[key] = None if preprocessedReferenceRunName != 'None': + outFolder = '{}/preprocessed'.format(outputDirectory) inFilesPreprocessed = '{}/icearea.{}.year*.nc'.format( preprocessedReferenceDirectory, preprocessedReferenceRunName) - dsPreprocessed = open_multifile_dataset( - fileNames=inFilesPreprocessed, - calendar=calendar, - config=config, - timeVariableName='xtime') + + outFileName = '{}/iceArea.nc'.format(outFolder) + + combine_time_series_with_ncrcat(inFilesPreprocessed, + outFileName, + logger=self.logger) + dsPreprocessed = open_mpas_dataset(fileName=outFileName, + calendar=calendar, + timeVariableNames='xtime') dsPreprocessedTimeSlice = dsPreprocessed.sel( Time=slice(timeStart, timeEnd)) key = (hemisphere, 'iceArea') @@ -389,11 +410,14 @@ def run_task(self): # {{{ inFilesPreprocessed = '{}/icevol.{}.year*.nc'.format( preprocessedReferenceDirectory, preprocessedReferenceRunName) - dsPreprocessed = open_multifile_dataset( - fileNames=inFilesPreprocessed, - calendar=calendar, - config=config, - timeVariableName='xtime') + outFileName = '{}/iceVolume.nc'.format(outFolder) + + combine_time_series_with_ncrcat(inFilesPreprocessed, + outFileName, + logger=self.logger) + dsPreprocessed = open_mpas_dataset(fileName=outFileName, + calendar=calendar, + timeVariableNames='xtime') dsPreprocessedTimeSlice = dsPreprocessed.sel( Time=slice(timeStart, timeEnd)) key = (hemisphere, 'iceVolume') diff --git a/mpas_analysis/shared/time_series/__init__.py b/mpas_analysis/shared/time_series/__init__.py index 627ff0408..4062e3102 100644 --- a/mpas_analysis/shared/time_series/__init__.py +++ b/mpas_analysis/shared/time_series/__init__.py @@ -1,4 +1,5 @@ -from mpas_analysis.shared.time_series.time_series import cache_time_series +from mpas_analysis.shared.time_series.time_series import cache_time_series, \ + combine_time_series_with_ncrcat from mpas_analysis.shared.time_series.mpas_time_series_task import \ MpasTimeSeriesTask diff --git a/mpas_analysis/shared/time_series/time_series.py b/mpas_analysis/shared/time_series/time_series.py index 3f2b2c187..f3a28175f 100644 --- a/mpas_analysis/shared/time_series/time_series.py +++ b/mpas_analysis/shared/time_series/time_series.py @@ -18,10 +18,99 @@ import xarray as xr import numpy import os +from distutils.spawn import find_executable +import glob +import subprocess +from six import string_types from mpas_analysis.shared.timekeeping.utility import days_to_datetime +def combine_time_series_with_ncrcat(inFileNames, outFileName, + variableList=None, logger=None): + # {{{ + ''' + Uses ncrcat to extact time series from a series of files + + inFileNames : str or list of str + A file name with wildcard(s) or a list of input files from which to + extract the time series. + + outFileName : str + The output NetCDF file where the time series should be written. + + variableList : list of str, optional + A list of varibles to include. All variables are included by default + + logger : `logging.Logger``, optional + A logger to which ncclimo output should be redirected + + Raises + ------ + OSError + If ``ncrcat`` is not in the system path. + + Author + ------ + Xylar Asay-Davis + ''' + + if find_executable('ncrcat') is None: + raise OSError('ncrcat not found. Make sure the latest nco ' + 'package is installed: \n' + 'conda install nco\n' + 'Note: this presumes use of the conda-forge ' + 'channel.') + + if os.path.exists(outFileName): + return + + if isinstance(inFileNames, string_types): + inFileNames = sorted(glob.glob(inFileNames)) + + args = ['ncrcat', '-4', '--record_append', '--no_tmp_fl'] + + if variableList is not None: + args.extend(['-v', ','.join(variableList)]) + + printCommand = '{} {} ... {} {}'.format(' '.join(args), inFileNames[0], + inFileNames[-1], + outFileName) + args.extend(inFileNames) + args.append(outFileName) + + if logger is None: + print('running: {}'.format(printCommand)) + else: + logger.info('running: {}'.format(printCommand)) + for handler in logger.handlers: + handler.flush() + process = subprocess.Popen(args, stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + stdout, stderr = process.communicate() + + if stdout: + stdout = stdout.decode('utf-8') + for line in stdout.split('\n'): + if logger is None: + print(line) + else: + logger.info(line) + if stderr: + stderr = stderr.decode('utf-8') + for line in stderr.split('\n'): + if logger is None: + print(line) + else: + logger.error(line) + + if process.returncode != 0: + raise subprocess.CalledProcessError(process.returncode, + ' '.join(args)) + + # }}} + + def cache_time_series(timesInDataSet, timeSeriesCalcFunction, cacheFileName, calendar, yearsPerCacheUpdate=1, logger=None): # {{{