Skip to content

Commit

Permalink
Merge pull request MPAS-Dev#411 from xylar/use_ncrcat_for_obs
Browse files Browse the repository at this point in the history
Update all multifile datasets to use ncrcat

Some obs. and preprocessed datasets were being opened with xarray.open_mfdataset() function (via the the "generalized reader" in MPAS-Analysis), which sometimes led to "too many open files" errors.
  • Loading branch information
xylar authored May 30, 2018
2 parents c702fa9 + 34e574a commit 2eea3fa
Show file tree
Hide file tree
Showing 5 changed files with 173 additions and 46 deletions.
34 changes: 21 additions & 13 deletions mpas_analysis/ocean/plot_depth_integrated_time_series_subtask.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,18 @@

from mpas_analysis.shared.plot.plotting import timeseries_analysis_plot

from mpas_analysis.shared.generalized_reader import open_multifile_dataset
from mpas_analysis.shared.io import open_mpas_dataset, write_netcdf

from mpas_analysis.shared.timekeeping.utility import date_to_days, \
days_to_datetime

from mpas_analysis.shared.io.utility import build_config_full_path
from mpas_analysis.shared.io.utility import build_config_full_path, \
make_directories

from mpas_analysis.shared.html import write_image_xml

from mpas_analysis.shared.time_series import compute_moving_avg
from mpas_analysis.shared.time_series import compute_moving_avg, \
combine_time_series_with_ncrcat


class PlotDepthIntegratedTimeSeriesSubtask(AnalysisTask):
Expand Down Expand Up @@ -223,7 +224,12 @@ def setup_and_check(self): # {{{
baseDirectory = build_config_full_path(
config, 'output', 'timeSeriesSubdirectory')

self.preprocessedFileName = '{}/preprocessed_{}'.format(
make_directories('{}/preprocessed'.format(baseDirectory))

self.preprocessedIntermediateFileName = \
'{}/preprocessed/intermediate_{}'.format(baseDirectory,
self.inFileName)
self.preprocessedFileName = '{}/preprocessed/{}'.format(
baseDirectory, self.inFileName)

if not os.path.isabs(self.inFileName):
Expand Down Expand Up @@ -337,18 +343,20 @@ def run_task(self): # {{{
preprocessedInputDirectory = config.get(
'oceanPreprocessedReference', 'baseDirectory')

self.logger.info(' Load in OHC from preprocessed reference '
'run...')
self.logger.info(' Load in preprocessed reference data...')
preprocessedFilePrefix = config.get(self.sectionName,
'preprocessedFilePrefix')
inFilesPreprocessed = '{}/{}.{}.year*.nc'.format(
preprocessedInputDirectory, preprocessedFilePrefix,
preprocessedReferenceRunName)
dsPreprocessed = open_multifile_dataset(
fileNames=inFilesPreprocessed,
calendar=calendar,
config=config,
timeVariableName='xtime')

combine_time_series_with_ncrcat(
inFilesPreprocessed, self.preprocessedIntermediateFileName,
logger=self.logger)
dsPreprocessed = open_mpas_dataset(
fileName=self.preprocessedIntermediateFileName,
calendar=calendar,
timeVariableNames='xtime')

yearStart = days_to_datetime(ds.Time.min(), calendar=calendar).year
yearEnd = days_to_datetime(ds.Time.max(), calendar=calendar).year
Expand Down Expand Up @@ -378,7 +386,7 @@ def run_task(self): # {{{
if preprocessedReferenceRunName != 'None':
color = 'purple'
title = '{} \n {} (purple)'.format(title,
preprocessedReferenceRunName)
preprocessedReferenceRunName)

preprocessedFieldPrefix = config.get(self.sectionName,
'preprocessedFieldPrefix')
Expand All @@ -389,7 +397,7 @@ def run_task(self): # {{{
suffixes = ['tot'] + ['{}m'.format(depth) for depth in
divisionDepths] + ['btm']

# these preprocessed data are OHC *anomalies*
# these preprocessed data are already anomalies
dsPreprocessed = compute_moving_avg(dsPreprocessed,
movingAveragePoints)
for rangeIndex in range(len(suffixes)):
Expand Down
17 changes: 11 additions & 6 deletions mpas_analysis/ocean/time_series_sst.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

from mpas_analysis.shared.plot.plotting import timeseries_analysis_plot

from mpas_analysis.shared.generalized_reader import open_multifile_dataset
from mpas_analysis.shared.time_series import combine_time_series_with_ncrcat
from mpas_analysis.shared.io import open_mpas_dataset

from mpas_analysis.shared.timekeeping.utility import date_to_days, \
Expand Down Expand Up @@ -200,11 +200,16 @@ def run_task(self): # {{{
'run...')
inFilesPreprocessed = '{}/SST.{}.year*.nc'.format(
preprocessedInputDirectory, preprocessedReferenceRunName)
dsPreprocessed = open_multifile_dataset(
fileNames=inFilesPreprocessed,
calendar=calendar,
config=config,
timeVariableName='xtime')

outFolder = '{}/preprocessed'.format(outputDirectory)
make_directories(outFolder)
outFileName = '{}/sst.nc'.format(outFolder)

combine_time_series_with_ncrcat(inFilesPreprocessed,
outFileName, logger=self.logger)
dsPreprocessed = open_mpas_dataset(fileName=outFileName,
calendar=calendar,
timeVariableNames='xtime')
yearEndPreprocessed = days_to_datetime(dsPreprocessed.Time.max(),
calendar=calendar).year
if yearStart <= yearEndPreprocessed:
Expand Down
76 changes: 50 additions & 26 deletions mpas_analysis/sea_ice/time_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from mpas_analysis.shared.timekeeping.MpasRelativeDelta import \
MpasRelativeDelta

from mpas_analysis.shared.generalized_reader import open_multifile_dataset
from mpas_analysis.shared.time_series import combine_time_series_with_ncrcat
from mpas_analysis.shared.io import open_mpas_dataset, write_netcdf
from mpas_analysis.shared.mpas_xarray.mpas_xarray import subset_variables

Expand Down Expand Up @@ -261,13 +261,20 @@ def run_task(self): # {{{
calendar=calendar)

if preprocessedReferenceRunName != 'None':
# determine if we're beyond the end of the preprocessed data
# (and go ahead and cache the data set while we're checking)
outFolder = '{}/preprocessed'.format(outputDirectory)
make_directories(outFolder)
inFilesPreprocessed = '{}/icevol.{}.year*.nc'.format(
preprocessedReferenceDirectory, preprocessedReferenceRunName)
dsPreprocessed = open_multifile_dataset(
fileNames=inFilesPreprocessed,
calendar=calendar,
config=config,
timeVariableName='xtime')
outFileName = '{}/iceVolume.nc'.format(outFolder)

combine_time_series_with_ncrcat(inFilesPreprocessed,
outFileName,
logger=self.logger)
dsPreprocessed = open_mpas_dataset(fileName=outFileName,
calendar=calendar,
timeVariableNames='xtime')
preprocessedYearEnd = days_to_datetime(dsPreprocessed.Time.max(),
calendar=calendar).year
if yearStart <= preprocessedYearEnd:
Expand Down Expand Up @@ -349,37 +356,51 @@ def run_task(self): # {{{
key = (hemisphere, variableName)

if compareWithObservations:
dsObs = open_multifile_dataset(
fileNames=obsFileNames['iceArea'][hemisphere],
calendar=calendar,
config=config,
timeVariableName='xtime')

outFolder = '{}/obs'.format(outputDirectory)
make_directories(outFolder)
outFileName = '{}/iceArea{}.nc'.format(outFolder, hemisphere)

combine_time_series_with_ncrcat(
obsFileNames['iceArea'][hemisphere],
outFileName, logger=self.logger)
dsObs = open_mpas_dataset(fileName=outFileName,
calendar=calendar,
timeVariableNames='xtime')
key = (hemisphere, 'iceArea')
obs[key] = self._replicate_cycle(plotVars[key], dsObs.IceArea,
calendar)

key = (hemisphere, 'iceVolume')
if hemisphere == 'NH':
dsObs = open_multifile_dataset(
fileNames=obsFileNames['iceVolume'][hemisphere],
calendar=calendar,
config=config,
timeVariableName='xtime')
outFileName = '{}/iceVolume{}.nc'.format(outFolder,
hemisphere)
combine_time_series_with_ncrcat(
obsFileNames['iceVolume'][hemisphere],
outFileName, logger=self.logger)
dsObs = open_mpas_dataset(fileName=outFileName,
calendar=calendar,
timeVariableNames='xtime')
obs[key] = self._replicate_cycle(plotVars[key],
dsObs.IceVol,
calendar)
else:
obs[key] = None

if preprocessedReferenceRunName != 'None':
outFolder = '{}/preprocessed'.format(outputDirectory)
inFilesPreprocessed = '{}/icearea.{}.year*.nc'.format(
preprocessedReferenceDirectory,
preprocessedReferenceRunName)
dsPreprocessed = open_multifile_dataset(
fileNames=inFilesPreprocessed,
calendar=calendar,
config=config,
timeVariableName='xtime')

outFileName = '{}/iceArea.nc'.format(outFolder)

combine_time_series_with_ncrcat(inFilesPreprocessed,
outFileName,
logger=self.logger)
dsPreprocessed = open_mpas_dataset(fileName=outFileName,
calendar=calendar,
timeVariableNames='xtime')
dsPreprocessedTimeSlice = dsPreprocessed.sel(
Time=slice(timeStart, timeEnd))
key = (hemisphere, 'iceArea')
Expand All @@ -389,11 +410,14 @@ def run_task(self): # {{{
inFilesPreprocessed = '{}/icevol.{}.year*.nc'.format(
preprocessedReferenceDirectory,
preprocessedReferenceRunName)
dsPreprocessed = open_multifile_dataset(
fileNames=inFilesPreprocessed,
calendar=calendar,
config=config,
timeVariableName='xtime')
outFileName = '{}/iceVolume.nc'.format(outFolder)

combine_time_series_with_ncrcat(inFilesPreprocessed,
outFileName,
logger=self.logger)
dsPreprocessed = open_mpas_dataset(fileName=outFileName,
calendar=calendar,
timeVariableNames='xtime')
dsPreprocessedTimeSlice = dsPreprocessed.sel(
Time=slice(timeStart, timeEnd))
key = (hemisphere, 'iceVolume')
Expand Down
3 changes: 2 additions & 1 deletion mpas_analysis/shared/time_series/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from mpas_analysis.shared.time_series.time_series import cache_time_series
from mpas_analysis.shared.time_series.time_series import cache_time_series, \
combine_time_series_with_ncrcat
from mpas_analysis.shared.time_series.mpas_time_series_task import \
MpasTimeSeriesTask

Expand Down
89 changes: 89 additions & 0 deletions mpas_analysis/shared/time_series/time_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,99 @@
import xarray as xr
import numpy
import os
from distutils.spawn import find_executable
import glob
import subprocess
from six import string_types

from mpas_analysis.shared.timekeeping.utility import days_to_datetime


def combine_time_series_with_ncrcat(inFileNames, outFileName,
variableList=None, logger=None):
# {{{
'''
Uses ncrcat to extact time series from a series of files
inFileNames : str or list of str
A file name with wildcard(s) or a list of input files from which to
extract the time series.
outFileName : str
The output NetCDF file where the time series should be written.
variableList : list of str, optional
A list of varibles to include. All variables are included by default
logger : `logging.Logger``, optional
A logger to which ncclimo output should be redirected
Raises
------
OSError
If ``ncrcat`` is not in the system path.
Author
------
Xylar Asay-Davis
'''

if find_executable('ncrcat') is None:
raise OSError('ncrcat not found. Make sure the latest nco '
'package is installed: \n'
'conda install nco\n'
'Note: this presumes use of the conda-forge '
'channel.')

if os.path.exists(outFileName):
return

if isinstance(inFileNames, string_types):
inFileNames = sorted(glob.glob(inFileNames))

args = ['ncrcat', '-4', '--record_append', '--no_tmp_fl']

if variableList is not None:
args.extend(['-v', ','.join(variableList)])

printCommand = '{} {} ... {} {}'.format(' '.join(args), inFileNames[0],
inFileNames[-1],
outFileName)
args.extend(inFileNames)
args.append(outFileName)

if logger is None:
print('running: {}'.format(printCommand))
else:
logger.info('running: {}'.format(printCommand))
for handler in logger.handlers:
handler.flush()
process = subprocess.Popen(args, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
stdout, stderr = process.communicate()

if stdout:
stdout = stdout.decode('utf-8')
for line in stdout.split('\n'):
if logger is None:
print(line)
else:
logger.info(line)
if stderr:
stderr = stderr.decode('utf-8')
for line in stderr.split('\n'):
if logger is None:
print(line)
else:
logger.error(line)

if process.returncode != 0:
raise subprocess.CalledProcessError(process.returncode,
' '.join(args))

# }}}


def cache_time_series(timesInDataSet, timeSeriesCalcFunction, cacheFileName,
calendar, yearsPerCacheUpdate=1,
logger=None): # {{{
Expand Down

0 comments on commit 2eea3fa

Please sign in to comment.