diff --git a/src/lasdi/enums.py b/src/lasdi/enums.py index 33b9bd8..ce24936 100644 --- a/src/lasdi/enums.py +++ b/src/lasdi/enums.py @@ -1,13 +1,13 @@ from enum import Enum class NextStep(Enum): - Train = 1 - PickSample = 2 - RunSample = 3 - CollectSample = 4 + Train = 1 + PickSample = 2 + RunSample = 3 + CollectSample = 4 class Result(Enum): - Unexecuted = 1 - Success = 2 - Fail = 3 - Complete = 4 \ No newline at end of file + Unexecuted = 1 + Success = 2 + Fail = 3 + Complete = 4 \ No newline at end of file diff --git a/src/lasdi/gp.py b/src/lasdi/gp.py index 3af98ff..ddc3fc5 100644 --- a/src/lasdi/gp.py +++ b/src/lasdi/gp.py @@ -1,80 +1,194 @@ -import numpy as np -from sklearn.gaussian_process.kernels import ConstantKernel, Matern, RBF -from sklearn.gaussian_process import GaussianProcessRegressor +import numpy as np +from sklearn.gaussian_process.kernels import ConstantKernel, Matern, RBF +from sklearn.gaussian_process import GaussianProcessRegressor -def fit_gps(X, Y): - ''' - Trains each GP given the interpolation dataset. - X: (n_train, n_param) numpy 2d array - Y: (n_train, n_coef) numpy 2d array + + +# ------------------------------------------------------------------------------------------------- +# Gaussian Process functions! +# ------------------------------------------------------------------------------------------------- + +def fit_gps(X : np.ndarray, Y : np.ndarray) -> list[GaussianProcessRegressor]: + """ + Trains a GP for each column of Y. If Y has shape N x k, then we train k GP regressors. In this + case, we assume that X has shape N x M. Thus, the Input to the GP is in \mathbb{R}^M. For each + k, we train a GP where the i'th row of X is the input and the i,k component of Y is the + corresponding target. Thus, we return a list of k GP Regressor objects, the k'th one of which + makes predictions for the k'th coefficient in the latent dynamics. + We assume each target coefficient is independent with each other. - gp_dictionnary is a dataset containing the trained GPs (as sklearn objects) - ''' - n_coef = 1 if (Y.ndim == 1) else Y.shape[1] + ----------------------------------------------------------------------------------------------- + Arguments + ----------------------------------------------------------------------------------------------- + + X: A 2d numpy array of shape (n_train, input_dim), where n_train is the number of training + examples and input_dim is the number of components in each input (e.g., the number of + parameters) + + Y: A 2d numpy array of shape (n_train, n_coef), where n_train is the number of training + examples and n_coef is the number of coefficients in the latent dynamics. + + + ----------------------------------------------------------------------------------------------- + Returns + ----------------------------------------------------------------------------------------------- + + A list of trained GP regressor objects. If Y has k columns, then the returned list has k + elements. It's i'th element holds a trained GP regressor object whose training inputs are the + columns of X and whose corresponding target values are the elements of the i'th column of Y. + """ + + # Determine the number of components (columns) of Y. Since this is a regression task, we will + # perform a GP regression fit on each component (column) of Y. + n_coef : int = 1 if (Y.ndim == 1) else Y.shape[1] + + # Transpose Y so that each row corresponds to a particular coefficient. This allows us to + # iterate over the coefficients by iterating through the rows of Y. if (n_coef > 1): Y = Y.T + # Sklearn requires X to be a 2D array... so make sure this holds. if X.ndim == 1: X = X.reshape(-1, 1) - gp_dictionnary = [] + # Initialize a list to hold the trained GP objects. + gp_list : list[GaussianProcessRegressor] = [] + # Cycle through the rows of Y (which we transposed... so this just cycles through the + # coefficients) for yk in Y: + # Make the kernel. # kernel = ConstantKernel() * Matern(length_scale_bounds = (0.01, 1e5), nu = 1.5) - kernel = ConstantKernel() * RBF(length_scale_bounds = (0.1, 1e5)) + kernel = ConstantKernel() * RBF(length_scale_bounds = (0.1, 1e5)) - gp = GaussianProcessRegressor(kernel = kernel, n_restarts_optimizer = 10, random_state = 1) - gp.fit(X, yk) + # Initialize the GP object. + gp = GaussianProcessRegressor(kernel = kernel, n_restarts_optimizer = 10, random_state = 1) - gp_dictionnary += [gp] + # Fit it to the data (train), then add it to the list of trained GPs + gp.fit(X, yk) + gp_list += [gp] - return gp_dictionnary + # All done! + return gp_list -def eval_gp(gp_dictionnary, param_grid): - ''' +def eval_gp(gp_list : list[GaussianProcessRegressor], param_grid : np.ndarray) -> tuple[np.ndarray, np.ndarray]: + """ Computes the GPs predictive mean and standard deviation for points of the parameter space grid - ''' - n_coef = len(gp_dictionnary) + ----------------------------------------------------------------------------------------------- + Arguments + ----------------------------------------------------------------------------------------------- + + gp_list: a list of trained GP regressor objects. The number of elements in this list should + match the number of columns in param_grid. The i'th element of this list is a GP regressor + object that predicts the i'th coefficient. + + param_grid: A 2d numpy.ndarray object of shape (number of parameter combination, number of + parameters). The i,j element of this array specifies the value of the j'th parameter in the + i'th combination of parameters. We use this as the testing set for the GP evaluation. + + + ----------------------------------------------------------------------------------------------- + Returns + ----------------------------------------------------------------------------------------------- + + A two element tuple. Both are 2d numpy arrays of shape (number of parameter combinations, + number of coefficients). The two arrays hold the predicted means and std's for each parameter + at each training example, respectively. + + Thus, the i,j element of the first return variable holds the predicted mean of the j'th + coefficient in the latent dynamics at the i'th training example. Likewise, the i,j element of + the second return variable holds the standard deviation in the predicted distribution for the + j'th coefficient in the latent dynamics at the i'th combination of parameter values. + """ + + # Fetch the numbers coefficients. Since there is one GP Regressor per SINDy coefficient, this + # just the length of the gp_list. + n_coef : int = len(gp_list) + + # Fetch the number of parameters, make sure the grid is 2D. if (param_grid.ndim == 1): param_grid = param_grid.reshape(1, -1) n_points = param_grid.shape[0] + # Initialize arrays to hold the mean, STD. pred_mean, pred_std = np.zeros([n_points, n_coef]), np.zeros([n_points, n_coef]) - for k, gp in enumerate(gp_dictionnary): + + # Cycle through the GPs (one for each coefficient in the SINDy coefficients!). + for k, gp in enumerate(gp_list): + # Make predictions using the parameters in the param_grid. pred_mean[:, k], pred_std[:, k] = gp.predict(param_grid, return_std = True) + # All done! return pred_mean, pred_std -def sample_coefs(gp_dictionnary, param, n_samples): - ''' - Generates sample sets of ODEs for one given parameter. - coef_samples is a list of length n_samples, where each terms is a matrix of SINDy coefficients sampled from the GP predictive - distributions +def sample_coefs( gp_list : list[GaussianProcessRegressor], + param : np.ndarray, + n_samples : int): + """ + Generates sets of ODE (SINDy) coefficients sampled from the predictive distribution for those + coefficients at the specified parameter value (parma). Specifically, for the k'th SINDy + coefficient, we draw n_samples samples of the predictive distribution for the k'th coefficient + when param is the parameter. + + + ----------------------------------------------------------------------------------------------- + Arguments + ----------------------------------------------------------------------------------------------- + + gp_list: a list of trained GP regressor objects. The number of elements in this list should + match the number of columns in param_grid. The i'th element of this list is a GP regressor + object that predicts the i'th coefficient. + + param: A combination of parameter values. i.e., a single test example. We evaluate each GP in + the gp_list at this parameter value (getting a prediction for each coefficient). + + n_samples: Number of samples of the predicted latent dynamics used to build ensemble of fom + predictions. N_s in the paper. + + + ----------------------------------------------------------------------------------------------- + Returns + ----------------------------------------------------------------------------------------------- + + A 2d numpy ndarray object called coef_samples. It has shape (n_samples, n_coef), where n_coef + is the number of coefficients (length of gp_list). The i,j element of this list is the i'th + sample of the j'th SINDy coefficient. + """ - ''' + # Fetch the number of coefficients (since there is one GP Regressor per coefficient, this is + # just the length of the gp_list). + n_coef : int = len(gp_list) - n_coef = len(gp_dictionnary) - coef_samples = np.zeros([n_samples, n_coef]) + # Initialize an array to hold the coefficient samples. + coef_samples : np.ndarray = np.zeros([n_samples, n_coef]) + # Make sure param is a 2d array with one row, we need this when evaluating the GP Regressor + # object. if param.ndim == 1: param = param.reshape(1, -1) - n_points = param.shape[0] + + # Make sure we only have a single sample. + n_points : int = param.shape[0] assert(n_points == 1) - pred_mean, pred_std = eval_gp(gp_dictionnary, param) + # Evaluate the predicted mean and std at the parameter value. + pred_mean, pred_std = eval_gp(gp_list, param) pred_mean, pred_std = pred_mean[0], pred_std[0] + # Cycle through the samples and coefficients. For each sample of the k'th coefficient, we draw + # a sample from the normal distribution with mean pred_mean[k] and std pred_std[k]. for s in range(n_samples): for k in range(n_coef): coef_samples[s, k] = np.random.normal(pred_mean[k], pred_std[k]) + # All done! return coef_samples \ No newline at end of file diff --git a/src/lasdi/gplasdi.py b/src/lasdi/gplasdi.py index ac3d046..02cbe1d 100644 --- a/src/lasdi/gplasdi.py +++ b/src/lasdi/gplasdi.py @@ -1,80 +1,306 @@ -from .gp import * -from .latent_space import * -from .enums import * -from .timing import Timer -import torch -import time -import numpy as np +# ------------------------------------------------------------------------------------------------- +# Imports +# ------------------------------------------------------------------------------------------------- + +import time + +import torch +import numpy as np +from torch.optim import Optimizer +from sklearn.gaussian_process import GaussianProcessRegressor + +from .gp import eval_gp, sample_coefs, fit_gps +from .latent_space import initial_condition_latent, Autoencoder +from .enums import NextStep, Result +from .physics import Physics +from .latent_dynamics import LatentDynamics +from .timing import Timer +from .param import ParameterSpace + + + +# ------------------------------------------------------------------------------------------------- +# Simulate latent dynamics +# ------------------------------------------------------------------------------------------------- + +def average_rom(autoencoder : Autoencoder, + physics : Physics, + latent_dynamics : LatentDynamics, + gp_list : list[GaussianProcessRegressor], + param_grid : np.ndarray): + """ + This function simulates the latent dynamics for a collection of testing parameters by using + the mean of the posterior distribution for each coefficient's posterior distribution. + Specifically, for each parameter combination, we determine the mean of the posterior + distribution for each coefficient. We then use this mean to simulate the latent dynamics + forward in time (starting from the latent encoding of the fom initial condition for that + combination of coefficients). -def average_rom(autoencoder, physics, latent_dynamics, gp_dictionary, param_grid): + + ----------------------------------------------------------------------------------------------- + Arguments + ----------------------------------------------------------------------------------------------- - if (param_grid.ndim == 1): - param_grid = param_grid.reshape(1, -1) - n_test = param_grid.shape[0] + autoencoder: The actual autoencoder object that we use to map the ICs into the latent space. + + physics: A "Physics" object that stores the datasets for each parameter combination. + + latent_dynamics: A LatentDynamics object which describes how we specify the dynamics in the + Autoencoder's latent space. + + gp_list: a list of trained GP regressor objects. The number of elements in this list should + match the number of columns in param_grid. The i'th element of this list is a GP regressor + object that predicts the i'th coefficient. + + param_grid: A 2d numpy.ndarray object of shape (number of parameter combination) x (number of + parameters). The i,j element of this array holds the value of the j'th parameter in the i'th + combination of parameters. - Z0 = initial_condition_latent(param_grid, physics, autoencoder) - pred_mean, _ = eval_gp(gp_dictionary, param_grid) + ----------------------------------------------------------------------------------------------- + Returns + ----------------------------------------------------------------------------------------------- + + A 3d numpy ndarray whose i, j, k element holds the k'th component of the j'th time step of + the solution to the latent dynamics when we use the latent encoding of the initial condition + from the i'th combination of parameter values + """ + + # The param grid needs to be two dimensional, with the first axis corresponding to which + # instance of the parameter values we are using. If there is only one parameter, it may be 1d. + # We can fix that by adding on an axis with size 1. + if (param_grid.ndim == 1): + param_grid = param_grid.reshape(1, -1) - Zis = np.zeros([n_test, physics.nt, autoencoder.n_z]) - for i in range(n_test): + # Now fetch the number of combinations of parameter values. + n_param : int = param_grid.shape[0] + + # For each parameter in param_grid, fetch the corresponding initial condition and then encode + # it. This gives us a list whose i'th element holds the encoding of the i'th initial condition. + Z0 : list[np.ndarray] = initial_condition_latent(param_grid, physics, autoencoder) + + # Evaluate each GP at each combination of parameter values. This returns two arrays, the + # first of which is a 2d array whose i,j element specifies the mean of the posterior + # distribution for the j'th coefficient at the i'th combination of parameter values. + pred_mean, _ = eval_gp(gp_list, param_grid) + + # For each testing parameter, cycle through the mean value of each coefficient from each + # posterior distribution. For each set of coefficients (combination of parameter values), solve + # the latent dynamics forward in time (starting from the corresponding IC value) and store the + # resulting solution frames in Zis, a 3d array whose i, j, k element holds the k'th component + # of the j'th time step fo the latent solution when we use the coefficients from the posterior + # distribution for the i'th combination of parameter values. + Zis : np.ndarray = np.zeros([n_param, physics.nt, autoencoder.n_z]) + for i in range(n_param): Zis[i] = latent_dynamics.simulate(pred_mean[i], Z0[i], physics.t_grid) + # All done! return Zis -def sample_roms(autoencoder, physics, latent_dynamics, gp_dictionary, param_grid, n_samples): - ''' - Collect n_samples of ROM trajectories on param_grid. - gp_dictionary: list of Gaussian process regressors (size of n_test) - param_grid: numpy 2d array - n_samples: integer - assert(len(gp_dictionnary) == param_grid.shape[0]) - output: np.array of size [n_test, n_samples, physics.nt, autoencoder.n_z] + +def sample_roms(autoencoder : Autoencoder, + physics : Physics, + latent_dynamics : LatentDynamics, + gp_list : list[GaussianProcessRegressor], + param_grid : np.ndarray, + n_samples : int) -> np.ndarray: ''' + This function samples the latent coefficients, solves the corresponding latent dynamics, and + then returns the resulting latent solutions. + + Specifically, for each combination of parameter values in the param_grid, we draw n_samples + samples of the latent coefficients (from the coefficient posterior distributions evaluated at + that parameter value). This gives us a set of n_samples latent dynamics coefficients. For each + set of coefficients, we solve the corresponding latent dynamics forward in time and store the + resulting solution frames. We do this for each sample and each combination of parameter values, + resulting in an (n_param, n_sample, n_t, n_z) array of solution frames, which is what we + return. - if (param_grid.ndim == 1): - param_grid = param_grid.reshape(1, -1) - n_test = param_grid.shape[0] + + ----------------------------------------------------------------------------------------------- + Arguments + ----------------------------------------------------------------------------------------------- + + autoencoder: An autoencoder. We use this to map the fom IC's (stored in Physics) to the + latent space using the autoencoder's encoder. + + physics: A "Physics" object that stores the ICs for each parameter combination. + + latent_dynamics: A LatentDynamics object which describes how we specify the dynamics in the + Autoencoder's latent space. We use this to simulate the latent dynamics forward in time. + + gp_list: a list of trained GP regressor objects. The number of elements in this list should + match the number of columns in param_grid. The i'th element of this list is a GP regressor + object that predicts the i'th coefficient. + + param_grid: A 2d numpy.ndarray object of shape (number of parameter combination) x (number of + parameters). The i,j element of this array holds the value of the j'th parameter in the i'th + combination of parameters. - Z0 = initial_condition_latent(param_grid, physics, autoencoder) + n_samples: The number of samples we want to draw from each posterior distribution for each + coefficient evaluated at each combination of parameter values. + - coef_samples = [sample_coefs(gp_dictionary, param_grid[i], n_samples) for i in range(n_test)] + + ----------------------------------------------------------------------------------------------- + Returns + ----------------------------------------------------------------------------------------------- + + A np.array of size [n_test, n_samples, physics.nt, autoencoder.n_z]. The i, j, k, l element + holds the l'th component of the k'th frame of the solution to the latent dynamics when we use + the j'th sample of latent coefficients drawn from the posterior distribution for the i'th + combination of parameter values (i'th row of param_grid). + ''' - Zis = np.zeros([n_test, n_samples, physics.nt, autoencoder.n_z]) + # The param grid needs to be two dimensional, with the first axis corresponding to which + # instance of the parameter values we are using. If there is only one parameter, it may be 1d. + # We can fix that by adding on an axis with size 1. + if (param_grid.ndim == 1): + param_grid = param_grid.reshape(1, -1) + + # Now fetch the number of combinations of parameter values (rows of param_grid). + n_param : int = param_grid.shape[0] + + # For each parameter in param_grid, fetch the corresponding initial condition and then encode + # it. This gives us a list whose i'th element holds the encoding of the i'th initial condition. + Z0 : list[np.ndarray] = initial_condition_latent(param_grid, physics, autoencoder) + + # Now, for each combination of parameters, draw n_samples samples from the posterior + # distributions for each coefficient at that combination of parameters. We store these samples + # in a list of numpy arrays. The k'th list element is a (n_sample, n_coef) array whose i, j + # element stores the i'th sample from the posterior distribution for the j'th coefficient at + # the k'th combination of parameter values. + coef_samples : list[np.ndarray] = [sample_coefs(gp_list, param_grid[i], n_samples) for i in range(n_param)] + + # For each testing parameter, cycle through the samples of the coefficients for that + # combination of parameter values. For each set of coefficients, solve the corresponding latent + # dynamics forward in time and store the resulting frames in Zis. This is a 4d array whose i, + # j, k, l element holds the l'th component of the k'th frame of the solution to the latent + # dynamics when we use the j'th sample of latent coefficients drawn from the posterior + # distribution for the i'th combination of parameter values. + Zis = np.zeros([n_param, n_samples, physics.nt, autoencoder.n_z]) for i, Zi in enumerate(Zis): z_ic = Z0[i] for j, coef_sample in enumerate(coef_samples[i]): Zi[j] = latent_dynamics.simulate(coef_sample, z_ic, physics.t_grid) + # All done! return Zis -def get_fom_max_std(autoencoder, Zis): - ''' - Computes the maximum standard deviation accross the parameter space grid and finds the corresponding parameter location +def get_fom_max_std(autoencoder : Autoencoder, Zis : np.ndarray) -> int: + """ + Computes the maximum standard deviation across the trajectories in Zis and returns the + corresponding parameter index. Specifically, Zis is a 4d tensor of shape (n_test, n_samples, + n_t, n_z). The first axis specifies which parameter combination we're using. For each + combination of parameters, we assume that we drew n_samples of the posterior distribution of + the coefficients at that parameter value, simulated the corresponding dynamics for n_t time + steps, and then recorded the results in Zis[i]. Thus, Zis[i, j, k, :] represents the k'th + time step of the solution to the latent dynamics when we use the coefficients from the j'th + sample of the posterior distribution for the i'th set of parameters. + + Let i \in {1, 2, ... , n_test} and k \in {1, 2, ... , n_t}. For each j, we map the k'th frame + of the j'th solution trajectory for the i'th parameter combination (Zi[i, j, k, :]) to a fom + frame. We do this for each j (the set of samples), which gives us a collection of n_sample + fom frames, representing samples of the distribution of fom frames at the k'th time step + when we use the posterior distribution for the i'th set of parameters. For each l \in {1, 2, + ... , n_fom}, we compute the STD of the set of l'th components of these n_sample fom frames. + We do this for each i and k and then figure out which i, k, l combination gives the largest + STD. We return the corresponding i index. + - ''' - # TODO(kevin): currently this evaluate pointwise maximum standard deviation. + ----------------------------------------------------------------------------------------------- + Arguments + ----------------------------------------------------------------------------------------------- + + autoencoder: The autoencoder. We assume the solved dynamics (whose frames are stored in Zis) + take place in the autoencoder's latent space. We use this to decode the solution frames. + + Zis: A 4d numpy array of shape (n_test, n_samples, n_t, n_z) whose i, j, k, l element holds + the l'th component of the k'th frame of the solution to the latent dynamics when we use the + j'th sample of latent coefficients drawn from the posterior distribution for the i'th testing + parameter. + + + + ----------------------------------------------------------------------------------------------- + Returns: + ----------------------------------------------------------------------------------------------- + + An integer. The index of the testing parameter that gives the largest standard deviation. + Specifically, for each testing parameter, we compute the STD of each component of the fom + solution at each frame generated by samples from the posterior coefficient distribution for + that parameter. We compute the maximum of these STDs and pair that number with the parameter. + We then return the index of the parameter whose corresponding maximum std (the number we pair + with it) is greatest. + """ + # TODO(kevin): currently this evaluate point-wise maximum standard deviation. # is this a proper metric? we might want to consider an average, or L2 norm of std. - max_std = 0 - for m, Zi in enumerate(Zis): - Z_m = torch.Tensor(Zi) - X_pred_m = autoencoder.decoder(Z_m).detach().numpy() - X_pred_m_std = X_pred_m.std(0) - max_std_m = X_pred_m_std.max() + max_std : float = 0.0 + # Cycle through the testing parameters. + for m, Zi in enumerate(Zis): + # Zi is a 3d tensor of shape (n_samples, n_t, n_z), where n_samples is the number of + # samples of the posterior distribution per parameter, n_t is the number of time steps in + # the latent dynamics solution, and n_z is the dimension of the latent space. The i,j,k + # element of Zi is the k'th component of the j'th frame of the solution to the latent + # dynamics when the latent dynamics uses the i'th set of sampled parameter values. + Z_m : torch.Tensor = torch.Tensor(Zi) + + # Now decode the frames. + X_pred_m : np.ndarray = autoencoder.decoder(Z_m).detach().numpy() + + # Compute the standard deviation across the sample axis. This gives us an array of shape + # (n_t, n_fom) whose i,j element holds the (sample) standard deviation of the j'th component + # of the i'th frame of the fom solution. In this case, the sample distribution consists of + # the set of j'th components of i'th frames of fom solutions (one for each sample of the + # coefficient posterior distributions). + X_pred_m_std : np.ndarray = X_pred_m.std(0) + + # Now compute the maximum standard deviation across frames/fom components. + max_std_m : np.float32 = X_pred_m_std.max() + + # If this is bigger than the biggest std we have seen so far, update the maximum. if max_std_m > max_std: - m_index = m - max_std = max_std_m + m_index : int = m + max_std : float = max_std_m + # Report the index of the testing parameter that gave the largest maximum std. return m_index + + +# ------------------------------------------------------------------------------------------------- +# BayesianGLaSDI class +# ------------------------------------------------------------------------------------------------- + # move optimizer parameters to device -def optimizer_to(optim, device): +def optimizer_to(optim : Optimizer, device : str) -> None: + """ + This function moves an optimizer object to a specific device. + + + ----------------------------------------------------------------------------------------------- + Arguments + ----------------------------------------------------------------------------------------------- + + optim: The optimizer whose device we want to change. + + device: The device we want to move optim onto. + + + ----------------------------------------------------------------------------------------------- + Returns + ----------------------------------------------------------------------------------------------- + + Nothing. + """ + + # Cycle through the optimizer's parameters. for param in optim.state.values(): # Not sure there are any global tensors in the state dict if isinstance(param, torch.Tensor): @@ -88,46 +314,89 @@ def optimizer_to(optim, device): if subparam._grad is not None: subparam._grad.data = subparam._grad.data.to(device) -class BayesianGLaSDI: - X_train = torch.Tensor([]) - X_test = torch.Tensor([]) - - def __init__(self, physics, autoencoder, latent_dynamics, param_space, config): - - ''' - - This class runs a full GPLaSDI training. It takes into input the autoencoder defined as a PyTorch object and the - dictionnary containing all the training parameters. - The "train" method with run the active learning training loop, compute the reconstruction and SINDy loss, train the GPs, - and sample a new FOM data point. - - ''' - self.autoencoder = autoencoder - self.latent_dynamics = latent_dynamics - self.physics = physics - self.param_space = param_space - self.timer = Timer() - self.n_samples = config['n_samples'] - self.lr = config['lr'] - self.n_iter = config['n_iter'] # number of iterations for one train and greedy sampling - self.max_iter = config['max_iter'] # maximum iterations for overall training - self.max_greedy_iter = config['max_greedy_iter'] # maximum iterations for greedy sampling - self.ld_weight = config['ld_weight'] - self.coef_weight = config['coef_weight'] - - self.optimizer = torch.optim.Adam(autoencoder.parameters(), lr = self.lr) - self.MSE = torch.nn.MSELoss() - - self.path_checkpoint = config['path_checkpoint'] - self.path_results = config['path_results'] +class BayesianGLaSDI: + X_train : torch.Tensor = torch.Tensor([]) + X_test : torch.Tensor = torch.Tensor([]) + + def __init__(self, + physics : Physics, + autoencoder : Autoencoder, + latent_dynamics : LatentDynamics, + param_space : ParameterSpace, + config : dict): + """ + This class runs a full GPLaSDI training. As input, it takes the autoencoder defined as a + torch.nn.Module object, a Physics object to recover fom ICs + information on the time + discretization, a + + The "train" method runs the active learning training loop, computes the reconstruction and + SINDy loss, trains the GPs, and samples a new FOM data point. + + + ------------------------------------------------------------------------------------------- + Arguments + ------------------------------------------------------------------------------------------- + + physics: A "Physics" object that we use to fetch the fom initial conditions (which we + encode into latent ICs). Each Physics object has + a corresponding PDE with parameters, and a way to generate a solution to that equation + given a particular set of parameter values (and an IC, BCs). We use this object to generate + fom solutions which we then use to train the autoencoder/latent dynamics. + + Autoencoder: An autoencoder object that we use to compress the fom state to a reduced, + latent state. + + latent_dynamics: A LatentDynamics object which describes how we specify the dynamics in the + Autoencoder's latent space. + + param_space: A Parameter space object which holds the set of testing and training + parameters. + + config: A dictionary housing the settings we wna to use to train the model on + the data generated by physics. + + ------------------------------------------------------------------------------------------- + Returns + ------------------------------------------------------------------------------------------- + + Nothing! + """ + + self.autoencoder = autoencoder + self.latent_dynamics = latent_dynamics + self.physics = physics + self.param_space = param_space + + # Initialize a timer object. We will use this while training. + self.timer = Timer() + + # Extract training/loss hyperparameters from the configuration file. + self.n_samples : int = config['n_samples'] # Number of samples to draw per coefficient per combination of parameters + self.lr : float = config['lr'] # Learning rate for the optimizer. + self.n_iter : int = config['n_iter'] # Number of iterations for one train and greedy sampling + self.max_iter : int = config['max_iter'] # Maximum iterations for overall training + self.max_greedy_iter : int = config['max_greedy_iter'] # Maximum iterations for greedy sampling + self.ld_weight : float = config['ld_weight'] # Weight of the SINDy loss in the loss function. \beta_2 in the paper. + self.coef_weight : float = config['coef_weight'] # Weight of the norm of matrix of latent dynamics coefficients. \beta_3 in the paper. + + # Set up the optimizer and loss function. + self.optimizer : Optimizer = torch.optim.Adam(autoencoder.parameters(), lr = self.lr) + self.MSE = torch.nn.MSELoss() + + # Set paths for checkpointing. + self.path_checkpoint : str = config['path_checkpoint'] + self.path_results : str = config['path_results'] + + # Make sure the checkpoints and results directories exist. from os.path import dirname from pathlib import Path - Path(dirname(self.path_checkpoint)).mkdir(parents=True, exist_ok=True) - Path(dirname(self.path_results)).mkdir(parents=True, exist_ok=True) + Path(dirname(self.path_checkpoint)).mkdir( parents = True, exist_ok = True) + Path(dirname(self.path_results)).mkdir( parents = True, exist_ok = True) + # Set the device to train on. We default to cpu. device = config['device'] if 'device' in config else 'cpu' if (device == 'cuda'): assert(torch.cuda.is_available()) @@ -138,65 +407,121 @@ def __init__(self, physics, autoencoder, latent_dynamics, param_space, config): else: self.device = 'cpu' - self.best_loss = np.Inf - self.best_coefs = None - self.restart_iter = 0 + # Set up variables to aide checkpointing. + self.best_loss : float = np.inf # The lowest testing loss we have found so far + self.best_coefs : np.ndarray = None # The best coefficients from the iteration with lowest testing loss + self.restart_iter : int = 0 # Iteration number at the end of the last training period + + # Set placeholder tensors to hold the testing and training data. We expect to set up + # X_train to be a tensor of shape (Np, Nt, Nx[0], ... , Nx[Nd - 1]), where Np is the number + # of parameter combinations in the training set, Nt is the number of time steps per fom + # solution, and Nx[0], ... , Nx[Nd - 1] represent the number of steps along the spatial + # axes. X_test has an analogous shape, but it's leading dimension has a size matching the + # number of combinations of parameters in the testing set. + self.X_train : torch.Tensor = torch.Tensor([]) + self.X_test : torch.Tensor = torch.Tensor([]) + + # All done! + return - self.X_train = torch.Tensor([]) - self.X_test = torch.Tensor([]) - return - def train(self): + def train(self) -> None: + """ + Runs a round of training on the autoencoder. + + ------------------------------------------------------------------------------------------- + Arguments + ------------------------------------------------------------------------------------------- + + None! + + + ------------------------------------------------------------------------------------------- + Returns + ------------------------------------------------------------------------------------------- + + Nothing! + """ + + # Make sure we have at least one training data point (the 0 axis of X_Train corresponds + # which combination of training parameters we use). assert(self.X_train.size(0) > 0) assert(self.X_train.size(0) == self.param_space.n_train()) - device = self.device - autoencoder_device = self.autoencoder.to(device) - X_train_device = self.X_train.to(device) + # Map everything to self's device. + device : str = self.device + autoencoder_device : Autoencoder = self.autoencoder.to(device) + X_train_device : torch.Tensor = self.X_train.to(device) + # Make sure the checkpoints and results directories exist. from pathlib import Path - Path(self.path_checkpoint).mkdir(parents=True, exist_ok=True) - Path(self.path_results).mkdir(parents=True, exist_ok=True) - - ps = self.param_space - n_train = ps.n_train() - ld = self.latent_dynamics + Path(self.path_checkpoint).mkdir( parents = True, exist_ok = True) + Path(self.path_results).mkdir( parents = True, exist_ok = True) - ''' - determine number of iterations. - Perform n_iter iterations until overall iterations hit max_iter. - ''' - next_iter = min(self.restart_iter + self.n_iter, self.max_iter) + ps : ParameterSpace = self.param_space + n_train : int = ps.n_train() + ld : LatentDynamics = self.latent_dynamics + # Determine number of iterations we should run in this round of training. + next_iter : int = min(self.restart_iter + self.n_iter, self.max_iter) + + # Run the iterations! for iter in range(self.restart_iter, next_iter): + # Begin timing the current training step. self.timer.start("train_step") + # Zero out the gradients. self.optimizer.zero_grad() - Z = autoencoder_device.encoder(X_train_device) - X_pred = autoencoder_device.decoder(Z) - Z = Z.cpu() - loss_ae = self.MSE(X_train_device, X_pred) - coefs, loss_ld, loss_coef = ld.calibrate(Z, self.physics.dt, compute_loss=True, numpy=True) - max_coef = np.abs(coefs).max() + # ------------------------------------------------------------------------------------- + # Forward pass + + # Run the forward pass + Z : torch.Tensor = autoencoder_device.encoder(X_train_device) + X_pred : torch.Tensor = autoencoder_device.decoder(Z) + Z : torch.Tensor = Z.cpu() + + # Compute the autoencoder loss. + loss_ae : torch.Tensor = self.MSE(X_train_device, X_pred) + # Compute the latent dynamics and coefficient losses. Also fetch the current latent + # dynamics coefficients for each training point. The latter is stored in a 3d array + # called "coefs" of shape (n_train, N_z, N_l), where N_{\mu} = n_train = number of + # training parameter combinations, N_z = latent space dimension, and N_l = number of + # terms in the SINDy library. + coefs, loss_ld, loss_coef = ld.calibrate(Z, self.physics.dt, compute_loss = True, numpy = True) + max_coef : np.float32 = np.abs(coefs).max() + + # Compute the final loss. loss = loss_ae + self.ld_weight * loss_ld / n_train + self.coef_weight * loss_coef / n_train + + # ------------------------------------------------------------------------------------- + # Backward Pass + + # Run back propagation and update the model parameters. loss.backward() self.optimizer.step() + # Check if we hit a new minimum loss. If so, make a checkpoint, record the loss and + # the iteration number. if loss.item() < self.best_loss: torch.save(autoencoder_device.cpu().state_dict(), self.path_checkpoint + '/' + 'checkpoint.pt') - autoencoder_device = self.autoencoder.to(device) - self.best_coefs = coefs - self.best_loss = loss.item() + autoencoder_device : Autoencoder = self.autoencoder.to(device) + self.best_coefs : np.ndarray = coefs + self.best_loss : float = loss.item() + + # ------------------------------------------------------------------------------------- + # Report Results from this iteration + # Report the current iteration number and losses print("Iter: %05d/%d, Loss: %3.10f, Loss AE: %3.10f, Loss LD: %3.10f, Loss COEF: %3.10f, max|c|: %04.1f, " % (iter + 1, self.max_iter, loss.item(), loss_ae.item(), loss_ld.item(), loss_coef.item(), max_coef), end = '') + # If there are fewer than 6 training examples, report the set of parameter combinations. if n_train < 6: print('Param: ' + str(np.round(ps.train_space[0, :], 4)), end = '') @@ -204,80 +529,192 @@ def train(self): print(', ' + str(np.round(ps.train_space[i, :], 4)), end = '') print(', ' + str(np.round(ps.train_space[-1, :], 4))) + # Otherwise, report the final 6 parameter combinations. else: print('Param: ...', end = '') for i in range(5): print(', ' + str(np.round(ps.train_space[-6 + i, :], 4)), end = '') print(', ' + str(np.round(ps.train_space[-1, :], 4))) + # We have finished a training step, stop the timer. self.timer.end("train_step") + # We are ready to wrap up the training procedure. self.timer.start("finalize") + # Now that we have completed another round of training, update the restart iteration. self.restart_iter += self.n_iter + # Recover the autoencoder + coefficients which attained the lowest loss. If we recorded + # our bess loss in this round of training, then we replace the autoencoder's parameters + # with those from the iteration that got the best loss. Otherwise, we use the current + # set of coefficients and serialize the current model. if ((self.best_coefs is not None) and (self.best_coefs.shape[0] == n_train)): - state_dict = torch.load(self.path_checkpoint + '/' + 'checkpoint.pt') + state_dict = torch.load(self.path_checkpoint + '/' + 'checkpoint.pt') self.autoencoder.load_state_dict(state_dict) else: - self.best_coefs = coefs + self.best_coefs : np.ndarray = coefs torch.save(autoencoder_device.cpu().state_dict(), self.path_checkpoint + '/' + 'checkpoint.pt') + # Report timing information. self.timer.end("finalize") self.timer.print() + # All done! return - - def get_new_sample_point(self): + + + + def get_new_sample_point(self) -> np.ndarray: + """ + This function uses a greedy process to sample a new parameter value. Specifically, it runs + through each combination of parameters in in self.param_space. For the i'th combination of + parameters, we generate a collection of samples of the coefficients in the latent dynamics. + We draw the k'th sample of the j'th coefficient from the posterior distribution for the + j'th coefficient at the i'th combination of parameters. We map the resulting solution back + into the real space and evaluate the standard deviation of the fom frames. We return the + combination of parameters which engenders the largest standard deviation (see the function + get_fom_max_std). + + + ------------------------------------------------------------------------------------------- + Arguments + ------------------------------------------------------------------------------------------- + + None! + + + ------------------------------------------------------------------------------------------- + Returns + ------------------------------------------------------------------------------------------- + + a 2d numpy ndarray object of shape (1, n_param) whose (0, j) element holds the value of + the j'th parameter in the new sample. + """ + + self.timer.start("new_sample") - assert(self.X_test.size(0) > 0) - assert(self.X_test.size(0) == self.param_space.n_test()) + assert(self.X_test.size(0) > 0) + assert(self.X_test.size(0) == self.param_space.n_test()) assert(self.best_coefs.shape[0] == self.param_space.n_train()) - coefs = self.best_coefs + coefs : np.ndarray = self.best_coefs print('\n~~~~~~~ Finding New Point ~~~~~~~') # TODO(kevin): william, this might be the place for new sampling routine. - ae = self.autoencoder.cpu() - ps = self.param_space - n_test = ps.n_test() + # Move the model to the cpu (this is where all the GP stuff happens) and load the model + # from the last checkpoint. This should be the one that obtained the best loss so far. + # Remember that coefs should specify the coefficients from that iteration. + ae : Autoencoder = self.autoencoder.cpu() + ps : ParameterSpace = self.param_space + n_test : int = ps.n_test() ae.load_state_dict(torch.load(self.path_checkpoint + '/' + 'checkpoint.pt')) - Z0 = initial_condition_latent(ps.test_space, self.physics, ae) - - gp_dictionnary = fit_gps(ps.train_space, coefs) - - coef_samples = [sample_coefs(gp_dictionnary, ps.test_space[i], self.n_samples) for i in range(n_test)] - - Zis = np.zeros([n_test, self.n_samples, self.physics.nt, ae.n_z]) + # Map the initial conditions for the fom to initial conditions in the latent space. + Z0 : list[np.ndarray] = initial_condition_latent(ps.test_space, self.physics, ae) + + # Train the GPs on the training data, get one GP per latent space coefficient. + gp_list : list[GaussianProcessRegressor] = fit_gps(ps.train_space, coefs) + + # For each combination of parameter values in the testing set, for each coefficient, + # draw a set of samples from the posterior distribution for that coefficient evaluated at + # the testing parameters. We store the samples for a particular combination of parameter + # values in a 2d numpy.ndarray of shape (n_sample, n_coef), whose i, j element holds the + # i'th sample of the j'th coefficient. We store the arrays for different parameter values + # in a list of length (number of combinations of parameters in the testing set). + coef_samples : list[np.ndarray] = [sample_coefs(gp_list, ps.test_space[i], self.n_samples) for i in range(n_test)] + + # Now, solve the latent dynamics forward in time for each set of coefficients in + # coef_samples. There are n_test combinations of parameter values, and we have n_samples + # sets of coefficients for each combination of parameter values. For each of those, we want + # to solve the corresponding latent dynamics for n_t time steps. Each one of the frames + # in that solution live in \mathbb{R}^{n_z}. Thus, we need to store the results in a 4d + # array of shape (n_test, n_samples, n_t, n_z) whose i, j, k, l element holds the l'th + # component of the k'th frame of the solution to the latent dynamics when we use the + # j'th sample of the coefficients for the i'th testing parameter value and when the latent + # dynamics uses the encoding of the i'th fom IC as its IC. + Zis : np.ndarray = np.zeros([n_test, self.n_samples, self.physics.nt, ae.n_z]) for i, Zi in enumerate(Zis): z_ic = Z0[i] for j, coef_sample in enumerate(coef_samples[i]): Zi[j] = self.latent_dynamics.simulate(coef_sample, z_ic, self.physics.t_grid) - m_index = get_fom_max_std(ae, Zis) + # Find the index of the parameter with the largest std. + m_index : int = get_fom_max_std(ae, Zis) - new_sample = ps.test_space[m_index, :].reshape(1, -1) + # We have found the testing parameter we want to add to the training set. Fetch it, then + # stop the timer and return the parameter. + new_sample : np.ndarray = ps.test_space[m_index, :].reshape(1, -1) print('New param: ' + str(np.round(new_sample, 4)) + '\n') - self.timer.end("new_sample") + + # All done! return new_sample - - def export(self): - dict_ = {'X_train': self.X_train, 'X_test': self.X_test, 'lr': self.lr, 'n_iter': self.n_iter, - 'n_samples' : self.n_samples, 'best_coefs': self.best_coefs, 'max_iter': self.max_iter, - 'max_iter': self.max_iter, 'ld_weight': self.ld_weight, 'coef_weight': self.coef_weight, - 'restart_iter': self.restart_iter, 'timer': self.timer.export(), 'optimizer': self.optimizer.state_dict() - } + + + + def export(self) -> dict: + """ + ------------------------------------------------------------------------------------------- + Returns + ------------------------------------------------------------------------------------------- + + A dictionary housing most of the internal variables in self. You can pass this dictionary + to self (after initializing it using ParameterSpace, Autoencoder, and LatentDynamics + objects) to make a GLaSDI object whose internal state matches that of self. + """ + + dict_ = {'X_train' : self.X_train, + 'X_test' : self.X_test, + 'lr' : self.lr, + 'n_iter' : self.n_iter, + 'n_samples' : self.n_samples, + 'best_coefs' : self.best_coefs, + 'max_iter' : self.max_iter, + 'max_iter' : self.max_iter, + 'ld_weight' : self.ld_weight, + 'coef_weight' : self.coef_weight, + 'restart_iter' : self.restart_iter, + 'timer' : self.timer.export(), + 'optimizer' : self.optimizer.state_dict()} return dict_ - - def load(self, dict_): - self.X_train = dict_['X_train'] - self.X_test = dict_['X_test'] - self.best_coefs = dict_['best_coefs'] - self.restart_iter = dict_['restart_iter'] + + + + def load(self, dict_ : dict) -> None: + """ + Modifies self's internal state to match the one whose export method generated the dict_ + dictionary. + + + ------------------------------------------------------------------------------------------- + Arguments + ------------------------------------------------------------------------------------------- + + dict_: This should be a dictionary returned by calling the export method on another + GLaSDI object. We use this to make self hav the same internal state as the object that + generated dict_. + + + ------------------------------------------------------------------------------------------- + Returns + ------------------------------------------------------------------------------------------- + + Nothing! + """ + + # Extract instance variables from dict_. + self.X_train : torch.Tensor = dict_['X_train'] + self.X_test : torch.Tensor = dict_['X_test'] + self.best_coefs : np.ndarray = dict_['best_coefs'] + self.restart_iter : int = dict_['restart_iter'] + + # Load the timer / optimizer. self.timer.load(dict_['timer']) self.optimizer.load_state_dict(dict_['optimizer']) if (self.device != 'cpu'): optimizer_to(self.optimizer, self.device) - return \ No newline at end of file + + # All done! + return + \ No newline at end of file diff --git a/src/lasdi/inputs.py b/src/lasdi/inputs.py index 0a654b6..fa030bb 100644 --- a/src/lasdi/inputs.py +++ b/src/lasdi/inputs.py @@ -1,67 +1,159 @@ +# ------------------------------------------------------------------------------------------------- +# Imports +# ------------------------------------------------------------------------------------------------- + from warnings import warn -verbose = False +verbose : bool = False + + +# ------------------------------------------------------------------------------------------------- +# Input Parser class +# ------------------------------------------------------------------------------------------------- class InputParser: - dict_ = None - name = "" + """ + A InputParser objects acts as a wrapper around a dictionary of settings. Thus, each setting is + a key and the corresponding value is the setting's value. Because one setting may itself be + a dictionary (we often group settings; each group has a name but several constituent settings), + the underlying dictionary is structured as a sequence of nested dictionaries. This class allows + the user to select a specific setting from that structure by specifying (via a list of strings) + where in that nested structure the desired setting lives. + """ + dict_ : dict = None + name : str = "" + + + + def __init__(self, dict : dict, name : str = "") -> None: + """" + Initializes an InputParser object by setting the underlying dictionary of settings as an + attribute. + + ------------------------------------------------------------------------------------------- + Arguments + ------------------------------------------------------------------------------------------- + + dict: The dictionary of settings. To avoid the risk of the user accidentally changing one + of the settings after wrapping it, we store a deep copy of dict in self. + """ - def __init__(self, dict, name = ""): + # A shallow copy could cause issues if the user changes dict's keys/values after + # initializing this object. We store a deep copy to avoid this risk. from copy import deepcopy self.dict_ = deepcopy(dict) + self.name = name - return - def getInput(self, keys, fallback=None, datatype=None): + + + def getInput(self, keys : list, fallback = None, datatype = None): ''' - Find the value corresponding to the list of keys. - If the specified keys do not exist, use the fallback value. - If the fallback value does not exist, returns an error. - If the datatype is specified, enforce the output value has the right datatype. + A InputParser object acts as a wrapper around a dictionary of settings. That is, self.dict_ + is structured as a nested family of dictionaries. Each setting corresponds to a key in + self.dict_. The setting's value is the corresponding value in self.dict_. In many cases, + a particular setting may be nested within others. That is, a setting's value may itself be + another dictionary housing various sub-settings. This function allows us to fetch a + specific setting from this nested structure. + + Specifically, we specify a list of strings. keys[0] should be a key in self.dict_ + If so, we set val = self.dict_[keys[0]]. If there are more keys, then val should be a + dictionary and keys[1] should be a key in this dictionary. In this case, we replace val + with val[key[1]] and so on. This continues until we have exhausted all keys. There is one + important exception: + + If at some point in the process, there are more keys but val is not a dictionary, or if + there are more keys and val is a dictionary but the next key is not a key in that + dictionary, then we return the fallback value. If the fallback value does not exist, + returns an error. + + + + ------------------------------------------------------------------------------------------- + Arguments + ------------------------------------------------------------------------------------------- + + keys: A list of keys we want to fetch from self.dict. keys[0] should be a key in self.dict_ + If so, we set val = self.dict_[keys[0]]. If there are more keys, then val should be a + dictionary and keys[1] should be a key in this dictionary. In this case, we replace val + with val[key[1]] and so on. This continues until we have exhausted all keys. + + fallback: A sort of default value. If at some point, val is not a dictionary (and there are + more keys) or val is a dictionary but the next key is not a valid key in that dictionary, + then we return the fallback value. + + datatype: If not None, then we require that the final val has this datatype. If the final + val does not have the desired datatype, we raise an exception. + + + + ------------------------------------------------------------------------------------------- + Returns + ------------------------------------------------------------------------------------------- + + The final val value as outlined by the process described above. ''' + + # Concatenate the keys together. This is for debugging purposes. keyString = "" for key_ in keys: keyString += key_ + "/" + # Begin by initializing val to self.dict_ val = self.dict_ + + # Cycle through the keys for key in keys: + + # Check if the current key is a key in val (this assumes val is a dictionary). If so, + # update val. Otherwise, return the fallback (if it is present) or raise an exception. if key in val: val = val[key] elif (fallback != None): return fallback else: raise RuntimeError("%s does not exist in the input dictionary %s!" % (keyString, self.name)) - + + # Check if the fallback and final val have the same type. if (fallback != None): if (type(val) != type(fallback)): warn("%s does not match the type with the fallback value %s!" % (str(type(val)), str(type(fallback)))) - + + # Check thast the final val matches the desired datatype if (datatype != None): if (type(val) != datatype): raise RuntimeError("%s does not match the specified datatype %s!" % (str(type(val)), str(datatype))) else: if verbose: warn("InputParser Warning: datatype is not checked.\n key: %s\n value type: %s" % (keys, type(val))) + + # All done! return val -def getDictFromList(list_, inputDict): - ''' - get a dict with {key: val} from a list of dicts - NOTE: it returns only the first item in the list, - even if the list has more than one dict with {key: val}. - ''' - dict_ = None - for item in list_: - isDict = True - for key, val in inputDict.items(): - if key not in item: - isDict = False - break - if (item[key] != val): - isDict = False - break - if (isDict): - dict_ = item - break - if (dict_ == None): - raise RuntimeError('Given list does not have a dict with {%s: %s}!' % (key, val)) - return dict_ + + +# ------------------------------------------------------------------------------------------------- +# Unused: getDictFromList function +# ------------------------------------------------------------------------------------------------- + +#def getDictFromList(list_, inputDict): +# ''' +# get a dict with {key: val} from a list of dicts +# NOTE: it returns only the first item in the list, +# even if the list has more than one dict with {key: val}. +# ''' +# dict_ = None +# for item in list_: +# isDict = True +# for key, val in inputDict.items(): +# if key not in item: +# isDict = False +# break +# if (item[key] != val): +# isDict = False +# break +# if (isDict): +# dict_ = item +# break +# if (dict_ == None): +# raise RuntimeError('Given list does not have a dict with {%s: %s}!' % (key, val)) +# return dict_ \ No newline at end of file diff --git a/src/lasdi/latent_space.py b/src/lasdi/latent_space.py index fdf8fef..7f64f65 100644 --- a/src/lasdi/latent_space.py +++ b/src/lasdi/latent_space.py @@ -1,175 +1,463 @@ -import torch -import numpy as np +# ------------------------------------------------------------------------------------------------- +# Imports and setup +# ------------------------------------------------------------------------------------------------- + +import torch +import numpy as np + +from .physics import Physics # activation dict -act_dict = {'ELU': torch.nn.ELU, - 'hardshrink': torch.nn.Hardshrink, - 'hardsigmoid': torch.nn.Hardsigmoid, - 'hardtanh': torch.nn.Hardtanh, - 'hardswish': torch.nn.Hardswish, - 'leakyReLU': torch.nn.LeakyReLU, - 'logsigmoid': torch.nn.LogSigmoid, - 'multihead': torch.nn.MultiheadAttention, - 'PReLU': torch.nn.PReLU, - 'ReLU': torch.nn.ReLU, - 'ReLU6': torch.nn.ReLU6, - 'RReLU': torch.nn.RReLU, - 'SELU': torch.nn.SELU, - 'CELU': torch.nn.CELU, - 'GELU': torch.nn.GELU, - 'sigmoid': torch.nn.Sigmoid, - 'SiLU': torch.nn.SiLU, - 'mish': torch.nn.Mish, - 'softplus': torch.nn.Softplus, - 'softshrink': torch.nn.Softshrink, - 'tanh': torch.nn.Tanh, - 'tanhshrink': torch.nn.Tanhshrink, - 'threshold': torch.nn.Threshold, - } - -def initial_condition_latent(param_grid, physics, autoencoder): - - ''' - - Outputs the initial condition in the latent space: Z0 = encoder(U0) - - ''' - - n_param = param_grid.shape[0] - Z0 = [] - - sol_shape = [1, 1] + physics.qgrid_size +act_dict = {'ELU' : torch.nn.ELU, + 'hardshrink' : torch.nn.Hardshrink, + 'hardsigmoid' : torch.nn.Hardsigmoid, + 'hardtanh' : torch.nn.Hardtanh, + 'hardswish' : torch.nn.Hardswish, + 'leakyReLU' : torch.nn.LeakyReLU, + 'logsigmoid' : torch.nn.LogSigmoid, + 'PReLU' : torch.nn.PReLU, + 'ReLU' : torch.nn.ReLU, + 'ReLU6' : torch.nn.ReLU6, + 'RReLU' : torch.nn.RReLU, + 'SELU' : torch.nn.SELU, + 'CELU' : torch.nn.CELU, + 'GELU' : torch.nn.GELU, + 'sigmoid' : torch.nn.Sigmoid, + 'SiLU' : torch.nn.SiLU, + 'mish' : torch.nn.Mish, + 'softplus' : torch.nn.Softplus, + 'softshrink' : torch.nn.Softshrink, + 'tanh' : torch.nn.Tanh, + 'tanhshrink' : torch.nn.Tanhshrink, + 'threshold' : torch.nn.Threshold,} + + + +# ------------------------------------------------------------------------------------------------- +# initial_conditions_latent function +# ------------------------------------------------------------------------------------------------- + +def initial_condition_latent(param_grid : np.ndarray, + physics : Physics, + autoencoder : torch.nn.Module) -> list[np.ndarray]: + """ + This function maps a set of initial conditions for the fom to initial conditions for the + latent space dynamics. Specifically, we take in a set of possible parameter values. For each + set of parameter values, we recover the fom IC (from physics), then map this fom IC to a + latent space IC (by encoding it using the autoencoder). We do this for each parameter + combination and then return a list housing the latent space ICs. + + + ----------------------------------------------------------------------------------------------- + Arguments + ----------------------------------------------------------------------------------------------- + + param_grid: A 2d numpy.ndarray object of shape (number of parameter combination) x (number of + parameters). The i,j element of this array holds the value of the j'th parameter in the i'th + combination of parameters. + + physics: A "Physics" object that, among other things, stores the IC for each combination of + parameters. + + autoencoder: The actual autoencoder object that we use to map the ICs into the latent space. + + ----------------------------------------------------------------------------------------------- + Returns + ----------------------------------------------------------------------------------------------- + + A list of numpy ndarray objects whose i'th element holds the latent space initial condition + for the i'th set of parameters in the param_grid. That is, if we let U0_i denote the fom IC for + the i'th set of parameters, then the i'th element of the returned list is Z0_i = encoder(U0_i). + """ + + # Figure out how many parameters we are testing at. + n_param : int = param_grid.shape[0] + Z0 : list[np.ndarray] = [] + sol_shape : list[int] = [1, 1] + physics.qgrid_size + + # Cycle through the parameters. for i in range(n_param): - u0 = physics.initial_condition(param_grid[i]) - u0 = u0.reshape(sol_shape) + # TODO(kevin): generalize parameter class. + + # Fetch the IC for the i'th set of parameters. Then map it to a tensor. + u0 : np.ndarray = physics.initial_condition(param_grid[i]) + u0 = u0.reshape(sol_shape) u0 = torch.Tensor(u0) - z0 = autoencoder.encoder(u0) - z0 = z0[0, 0, :].detach().numpy() + + # Encode the IC, then map the encoding to a numpy array. + z0 : np.ndarray = autoencoder.encoder(u0) + z0 = z0[0, 0, :].detach().numpy() + + # Append the new IC to the list of latent ICs Z0.append(z0) + # Return the list of latent ICs. return Z0 + + +# ------------------------------------------------------------------------------------------------- +# MLP class +# ------------------------------------------------------------------------------------------------- + class MultiLayerPerceptron(torch.nn.Module): + def __init__( self, + layer_sizes : list[int], + act_type : str = 'sigmoid', + reshape_index : int = None, + reshape_shape : tuple[int] = None, + threshold : float = 0.1, + value : float = 0.0) -> None: + """ + This class defines a standard multi-layer network network. - def __init__(self, layer_sizes, - act_type='sigmoid', reshape_index=None, reshape_shape=None, - threshold=0.1, value=0.0, num_heads=1): - super(MultiLayerPerceptron, self).__init__() - # including input, hidden, output layers - self.n_layers = len(layer_sizes) - self.layer_sizes = layer_sizes + ------------------------------------------------------------------------------------------- + Arguments + ------------------------------------------------------------------------------------------- - # Linear features between layers - self.fcs = [] - for k in range(self.n_layers-1): - self.fcs += [torch.nn.Linear(layer_sizes[k], layer_sizes[k + 1])] - self.fcs = torch.nn.ModuleList(self.fcs) - self.init_weight() + layer_sizes: A list of integers specifying the widths of the layers (including the + dimensionality of the domain of each layer, as well as the co-domain of the final layer). + Suppose this list has N elements. Then the network will have N - 1 layers. The i'th layer + maps from \mathbb{R}^{layer_sizes[i]} to \mathbb{R}^{layers_sizes[i]}. Thus, the i'th + element of this list represents the domain of the i'th layer AND the co-domain of the + i-1'th layer. + + act_type: A string specifying which activation function we want to use at the end of each + layer (except the final one). We use the same activation for each layer. - # Reshape input or output layer + reshape_index: This argument specifies if we should reshape the network's input or output + (or neither). If the user specifies reshape_index, then it must be either 0 or -1. Further, + in this case, they must also specify reshape_shape (you need to specify both together). If + it is 0, then reshape_shape specifies how we reshape the input before passing it through + the network (the input to the first layer). If reshape_index is -1, then reshape_shape + specifies how we reshape the network output before returning it (the output to the last + layer). + + reshape_shape: These are lists of k integers specifying the final k dimensions of the shape + of the input to the first layer (if reshape_index == 0) or the output of the last layer + (if reshape_index == -1). You must specify this argument if and only if you specify + reshape_index. + + threshold: You should only specify this argument if you are using the "Threshold" + activation type. In this case, it specifies the threshold value for that activation (if x + is the input and x > threshold, then the function returns x. Otherwise, it returns the + value). + + value: You should only specify this argument if you are using the "Threshold" activation + type. In this case, "value" specifies the value that the function returns if the input is + below the threshold. + + + ------------------------------------------------------------------------------------------- + Returns + ------------------------------------------------------------------------------------------- + + Nothing! + """ + + # Run checks. + # Make sure the reshape index is either 0 (input to 1st layer) or -1 (output of last + # layer). Also make sure that that product of the dimensions in the reshape_shape match + # the input dimension for the 1st layer (if reshape_index == 0) or the output dimension of + # the last layer (if reshape_index == 1). + # + # Why do we need the later condition? Well, suppose that reshape_shape has a length of k. + # If reshape_index == 0, then we squeeze the final k dimensions of the input and feed that + # into the first layer. Thus, in this case, we need the last dimension of the squeezed + # array to match the input domain of the first layer. On the other hand, reshape_index == -1 + # then we reshape the final dimension of the output to match the reshape_shape. Thus, in + # both cases, we need the product of the components of reshape_shape to match a + # corresponding element of layer_sizes. assert((reshape_index is None) or (reshape_index in [0, -1])) assert((reshape_shape is None) or (np.prod(reshape_shape) == layer_sizes[reshape_index])) - self.reshape_index = reshape_index - self.reshape_shape = reshape_shape - # Initalize activation function - self.act_type = act_type - self.use_multihead = False + super(MultiLayerPerceptron, self).__init__() + + # Note that layer_sizes specifies the dimensionality of the domains and co-domains of each + # layer. Specifically, the i'th element specifies the input dimension of the i'th layer, + # while the final element specifies the dimensionality of the co-domain of the final layer. + # Thus, the number of layers is one less than the length of layer_sizes. + self.n_layers : int = len(layer_sizes) - 1; + self.layer_sizes : list[int] = layer_sizes + + # Set up the affine parts of the layers. + self.layers : list[torch.nn.Module] = []; + for k in range(self.n_layers): + self.layers += [torch.nn.Linear(layer_sizes[k], layer_sizes[k + 1])] + self.layers = torch.nn.ModuleList(self.layers) + + # Now, initialize the weight matrices and bias vectors in the affine portion of each layer. + self.init_weight() + + # Reshape input to the 1st layer or output of the last layer. + self.reshape_index : int = reshape_index + self.reshape_shape : list[int] = reshape_shape + + # Set up the activation function. Note that we need to handle the threshold activation type + # separately because it requires an extra set of parameters to initialize. + self.act_type : str = act_type if act_type == "threshold": - self.act = act_dict[act_type](threshold, value) - - elif act_type == "multihead": - self.use_multihead = True - if (self.n_layers > 3): # if you have more than one hidden layer - self.act = [] - for i in range(self.n_layers-2): - self.act += [act_dict[act_type](layer_sizes[i+1], num_heads)] - else: - self.act = [torch.nn.Identity()] # No additional activation - self.act = torch.nn.ModuleList(self.fcs) - - #all other activation functions initialized here + self.act : torch.nn.Module = act_dict[act_type](threshold, value) else: - self.act = act_dict[act_type]() + self.act : torch.nn.Module = act_dict[act_type]() + + # All done! return - def forward(self, x): + + + def forward(self, x : torch.Tensor) -> torch.Tensor: + """ + This function defines the forward pass through self. + + + ------------------------------------------------------------------------------------------- + Arguments + ------------------------------------------------------------------------------------------- + + x: A tensor holding a batch of inputs. We pass this tensor through the network's layers + and then return the result. If self.reshape_index == 0 and self.reshape_shape has k + elements, then the final k elements of x's shape must match self.reshape_shape. + + + ------------------------------------------------------------------------------------------- + Returns + ------------------------------------------------------------------------------------------- + + The image of x under the network's layers. If self.reshape_index == -1 and + self.reshape_shape has k elements, then we reshape the output so that the final k elements + of its shape match those of self.reshape_shape. + """ + + # If the reshape_index is 0, we need to reshape x before passing it through the first + # layer. if (self.reshape_index == 0): - # make sure the input has a proper shape + # Make sure the input has a proper shape. There is a lot going on in this line; let's + # break it down. If reshape_index == 0, then we need to reshape the input, x, before + # passing it through the layers. Let's assume that reshape_shape has k elements. Then, + # we need to squeeze the final k dimensions of the input, x, so that the resulting + # tensor has a final dimension size that matches the input dimension size for the first + # layer. The check below makes sure that the final k dimensions of the input, x, match + # the stored reshape_shape. assert(list(x.shape[-len(self.reshape_shape):]) == self.reshape_shape) - # we use torch.Tensor.view instead of torch.Tensor.reshape, - # in order to avoid data copying. + + # Now that we know the final k dimensions of x have the correct shape, let's squeeze + # them into 1 dimension (so that we can pass the squeezed tensor through the first + # layer). To do this, we reshape x by keeping all but the last k dimensions of x, and + # replacing the last k with a single dimension whose size matches the dimensionality of + # the domain of the first layer. Note that we use torch.Tensor.view instead of + # torch.Tensor.reshape in order to avoid data copying. x = x.view(list(x.shape[:-len(self.reshape_shape)]) + [self.layer_sizes[self.reshape_index]]) - for i in range(self.n_layers-2): - x = self.fcs[i](x) # apply linear layer - if (self.use_multihead): - x = self.apply_attention(self, x, i) - else: - x = self.act(x) + # Pass x through the network layers (except for the final one, which has no activation + # function). + for i in range(self.n_layers - 1): + x : torch.Tensor = self.layers[i](x) # apply linear layer + x : torch.Tensor = self.act(x) # apply activation - x = self.fcs[-1](x) + # Apply the final (output) layer. + x = self.layers[-1](x) + # If the reshape_index is -1, then we need to reshape the output before returning. if (self.reshape_index == -1): - # we use torch.Tensor.view instead of torch.Tensor.reshape, - # in order to avoid data copying. + # In this case, we need to split the last dimension of x, the output of the final + # layer, to match the reshape_shape. This is precisely what the line below does. Note + # that we use torch.Tensor.view instead of torch.Tensor.reshape in order to avoid data + # copying. x = x.view(list(x.shape[:-1]) + self.reshape_shape) + # All done! return x - def apply_attention(self, x, act_idx): - x = x.unsqueeze(1) # Add sequence dimension for attention - x, _ = self.act[act_idx](x, x, x) # apply attention - x = x.squeeze(1) # Remove sequence dimension - return x - - def init_weight(self): + + + def init_weight(self) -> None: + """ + This function initializes the weight matrices and bias vectors in self's layers. It takes + no arguments and returns nothing! + """ + # TODO(kevin): support other initializations? - for fc in self.fcs: - torch.nn.init.xavier_uniform_(fc.weight) + for layer in self.layers: + torch.nn.init.xavier_uniform_(layer.weight) + torch.nn.init.zeros_(layer.bias) + + # All done! return + + +# ------------------------------------------------------------------------------------------------- +# Autoencoder class +# ------------------------------------------------------------------------------------------------- + class Autoencoder(torch.nn.Module): + def __init__(self, physics : Physics, config : dict) -> None: + """ + Initializes an Autoencoder object. An Autoencoder consists of two networks, an encoder, + E : \mathbb{R}^F -> \mathbb{R}^L, and a decoder, D : \mathbb{R}^L -> \marthbb{R}^F. We + assume that the dataset consists of samples of a parameterized L-manifold in + \mathbb{R}^F. The idea then is that E and D act like the inverse coordinate patch and + coordinate patch, respectively. In our case, E and D are trainable neural networks. We + try to train E and map data in \mathbb{R}^F to elements of a low dimensional latent + space (\mathbb{R}^L) which D can send back to the original data. (thus, E, and D should + act like inverses of one another). + + The Autoencoder class implements this model as a trainable torch.nn.Module object. + + + ------------------------------------------------------------------------------------------- + Arguments + ------------------------------------------------------------------------------------------- + + physics: A "Physics" object that holds the fom solution frames. We use this object to + determine the shape of each fom solution frame. Recall that each Physics object has a + corresponding PDE. We + + config: A dictionary representing the loaded .yml configuration file. We expect it to have + the following keys/: + hidden_units: A list of integers specifying the dimension of the co-domain of each + encoder layer except for the final one. Thus, if the k'th layer maps from + \mathbb{R}^{n(k)} to \mathbb{R}^{n(k + 1)} and there are K layers (indexed 0, 1, ... , + K - 1), then hidden_units should specify n(1), ... , n(K - 1). + + latent_dimension: The dimensionality of the Autoencoder's latent space. Equivalently, + the dimensionality of the co-domain of the encoder (i.e., the dimensionality of the + co-domain of the last layer of the encoder) and the domain of the decoder (i.e., the + dimensionality of the domain of the first layer of the decoder). + + + ------------------------------------------------------------------------------------------- + Returns + ------------------------------------------------------------------------------------------- + + Nothing! + """ - def __init__(self, physics, config): super(Autoencoder, self).__init__() - self.qgrid_size = physics.qgrid_size - self.space_dim = np.prod(self.qgrid_size) - hidden_units = config['hidden_units'] - n_z = config['latent_dimension'] - self.n_z = n_z - - layer_sizes = [self.space_dim] + hidden_units + [n_z] - #grab relevant initialization values from config - act_type = config['activation'] if 'activation' in config else 'sigmoid' - threshold = config["threshold"] if "threshold" in config else 0.1 - value = config["value"] if "value" in config else 0.0 - num_heads = config['num_heads'] if 'num_heads' in config else 1 - - self.encoder = MultiLayerPerceptron(layer_sizes, act_type, - reshape_index=0, reshape_shape=self.qgrid_size, - threshold=threshold, value=value, num_heads=num_heads) - - self.decoder = MultiLayerPerceptron(layer_sizes[::-1], act_type, - reshape_index=-1, reshape_shape=self.qgrid_size, - threshold=threshold, value=value, num_heads=num_heads) + # A Physics object's qgrid_size is a list of integers specifying the shape of each frame of + # the fom solution. If the solution is scalar valued, then this is just a list whose i'th + # element specifies the number of grid points along the i'th spatial axis. If the solution + # is vector valued, however, we prepend the dimensionality of the vector field to the list + # from the scalar list (so the 0 element represents the dimension of the vector field at + # each point). + self.qgrid_size : list[int] = physics.qgrid_size; + + # The product of the elements of qgrid_size is the number of dimensions in each fom + # solution frame. This number represents the dimensionality of the input to the encoder + # (since we pass a flattened fom frame as input). + self.space_dim : np.ndarray = np.prod(self.qgrid_size); + + # Fetch information about the domain/co-domain of each encoder layer. + hidden_units : list[int] = config['hidden_units']; + n_z : int = config['latent_dimension']; + self.n_z : int = n_z; + + # Build the "layer_sizes" argument for the MLP class. This consists of the dimensions of + # each layers' domain + the dimension of the co-domain of the final layer. + layer_sizes = [self.space_dim] + hidden_units + [n_z]; + + # Use the settings to set up the activation information for the encoder. + act_type = config['activation'] if 'activation' in config else 'sigmoid' + threshold = config["threshold"] if "threshold" in config else 0.1 + value = config["value"] if "value" in config else 0.0 + + # Now, build the encoder. + self.encoder = MultiLayerPerceptron( + layer_sizes = layer_sizes, + act_type = act_type, + reshape_index = 0, # We need to flatten the spatial dimensions of each fom frame. + reshape_shape = self.qgrid_size, + threshold = threshold, + value = value); + + self.decoder = MultiLayerPerceptron( + layer_sizes = layer_sizes[::-1], # Reverses the order of the the list. + act_type = act_type, + reshape_index = -1, + reshape_shape = self.qgrid_size, # We need to reshape the network output to a fom frame. + threshold = threshold, + value = value) + # All done! return - def forward(self, x): - x = self.encoder(x) - x = self.decoder(x) - return x + def forward(self, x : torch.Tensor) -> torch.Tensor: + """ + This function defines the forward pass through self. + + + ------------------------------------------------------------------------------------------- + Arguments + ------------------------------------------------------------------------------------------- + + x: A tensor holding a batch of inputs. We pass this tensor through the encoder + decoder + and then return the result. + + + ------------------------------------------------------------------------------------------- + Returns + ------------------------------------------------------------------------------------------- + + The image of x under the encoder + decoder. + """ + + # Encoder the input + z : torch.Tensor = self.encoder(x) + + # Now decode z. + y : torch.Tensor = self.decoder(z) + + # All done! Hopefully y \approx x. + return y - def export(self): - dict_ = {'autoencoder_param': self.cpu().state_dict()} + + + def export(self) -> dict: + """ + ------------------------------------------------------------------------------------------- + Returns + ------------------------------------------------------------------------------------------- + + This function extracts self's parameters and returns them in a dictionary. You can pass + the dictionary returned by this function to the load method of another Autoencoder object + (that you initialized to have the same architecture as self) to make the other autoencoder + identical to self. + """ + + # TO DO: deep export which includes all information needed to re-initialize self from + # scratch. This would probably require changing the initializer. + + dict_ = { 'autoencoder_param' : self.cpu().state_dict()} return dict_ - def load(self, dict_): + + + def load(self, dict_ : dict) -> None: + """ + This function loads self's state dictionary. + + + ------------------------------------------------------------------------------------------- + Arguments + ------------------------------------------------------------------------------------------- + + dict_: This should be a dictionary with the key "autoencoder_param" whose corresponding + value is the state dictionary of an autoencoder which has the same architecture (i.e., + layer sizes) as self. + + + ------------------------------------------------------------------------------------------- + Returns + ------------------------------------------------------------------------------------------- + + Nothing! + """ + self.load_state_dict(dict_['autoencoder_param']) return \ No newline at end of file diff --git a/src/lasdi/param.py b/src/lasdi/param.py index a8a2223..ec75e5c 100644 --- a/src/lasdi/param.py +++ b/src/lasdi/param.py @@ -1,83 +1,310 @@ -import numpy as np -from .inputs import InputParser +# ------------------------------------------------------------------------------------------------- +# Imports +# ------------------------------------------------------------------------------------------------- -def get_1dspace_from_list(config): - Nx = len(config['list']) - paramRange = np.array(config['list']) +import numpy as np +from .inputs import InputParser + + + +# ------------------------------------------------------------------------------------------------- +# Helper functions +# ------------------------------------------------------------------------------------------------- + +def get_1dspace_from_list(param_dict : dict) -> tuple[int, np.ndarray]: + """ + This function generates the parameter range (set of possible parameter values) for a parameter + that uses the list type test space. That is, "test_space_type" should be a key for the + parameter dictionary and the corresponding value should be "list". The parameter dictionary + should also have a "list" key whose value is a list of the possible parameter values. + + We parse this list and turn it into a numpy ndarray. + + + ----------------------------------------------------------------------------------------------- + Arguments + ----------------------------------------------------------------------------------------------- + + param_dict: A dictionary specifying one of the parameters. We should fetch this from the + configuration yaml file. It must have a "list" key whose corresponding value is a list of + floats. + + + ----------------------------------------------------------------------------------------------- + Returns + ----------------------------------------------------------------------------------------------- + + Two arguments: Nx and paramRange. paramRange is a 1d numpy ndarray (whose ith value is the + i'th element of param_dict["list"]). Nx is the length of paramRange. + """ + + # In this case, the parameter dictionary should have a "list" attribute which should list the + # parameter values we want to test. Fetch it (it's length is Nx) and use it to generate an + # array of possible values. + Nx : int = len(param_dict['list']) + paramRange : np.ndarray = np.array(param_dict['list']) + + # All done! return Nx, paramRange -def create_uniform_1dspace(config): - Nx = config['sample_size'] - minval = config['min'] - maxval = config['max'] - if (config['log_scale']): - paramRange = np.exp(np.linspace(np.log(minval), np.log(maxval), Nx)) + + +def create_uniform_1dspace(param_dict : dict) -> tuple[int, np.ndarray]: + """ + This function generates the parameter range (set of possible parameter values) for a parameter + that uses the uniform type test space. That is, "test_space_type" should be a key for the + parameter dictionary and the corresponding value should be "uniform". The parameter dictionary + should also have the following keys: + "min" + "max" + "sample_size" + "log_scale" + "min" and "max" specify the minimum and maximum value of the parameter, respectively. + "sample_size" specifies the number of parameter values we generate. Finally, log_scale, if + true, specifies if we should use a uniform or logarithmic spacing between samples of the + parameter. + + The values corresponding to "min" and "max" should be floats while the values corresponding to + "sample_size" and "log_scale" should be an int and a bool, respectively. + + + ----------------------------------------------------------------------------------------------- + Arguments + ----------------------------------------------------------------------------------------------- + + param_dict: A dictionary specifying one of the parameters. We should fetch this from the + configuration yaml file. It must have a "min", "max", "sample_size", and "log_scale" + keys (see above). + + + ----------------------------------------------------------------------------------------------- + Returns + ----------------------------------------------------------------------------------------------- + + Two arguments: Nx and paramRange. paramRange is a 1d numpy ndarray (whose ith value is the + i'th possible value of the parameter. Thus, paramRange[0] = param_dict["min"] and + paramRange[-1] = param_dict["max"]). Nx is the length of paramRange or, equivalently + param_dict["sample_size"]. + """ + + # Fetch the number of samples and the min/max value for this parameter. + Nx : int = param_dict['sample_size'] + minval : float = param_dict['min'] + maxval : float = param_dict['max'] + + # Generate the range of parameter values. Note that we have to generate a uniform grid in the + # log space, then exponentiate it to generate logarithmic spacing. + if (param_dict['log_scale']): + paramRange : np.ndarray = np.exp(np.linspace(np.log(minval), np.log(maxval), Nx)) else: - paramRange = np.linspace(minval, maxval, Nx) + paramRange : np.ndarray = np.linspace(minval, maxval, Nx) + + # All done! return Nx, paramRange -getParam1DSpace = {'list': get_1dspace_from_list, - 'uniform': create_uniform_1dspace} + + +# A macro that allows us to switch function we use to generate generate a parameter's range. +getParam1DSpace : dict[str, callable] = {'list' : get_1dspace_from_list, + 'uniform' : create_uniform_1dspace} + + + +# ------------------------------------------------------------------------------------------------- +# ParameterSpace Class +# ------------------------------------------------------------------------------------------------- class ParameterSpace: - param_list = [] - param_name = [] - n_param = 0 - train_space = None - test_space = None - n_init = 0 - test_grid_sizes = [] - test_meshgrid = None - - def __init__(self, config): - assert('parameter_space' in config) - parser = InputParser(config['parameter_space'], name="param_space_input") - - self.param_list = parser.getInput(['parameters'], datatype=list) - self.n_param = len(self.param_list) - - self.param_name = [] + # Initialize class variables + param_list : list[dict] = [] # A list housing the parameter dictionaries (from the yml file) + param_name : list[str] = [] # A list housing the parameter names. + n_param : int = 0 # The number of parameters. + train_space : np.ndarray = None # A 2D array of shape (n_train, n_param) whose i,j element is the j'th parameter value in the i'th combination of training parameters. + test_space : np.ndarray = None # A 2D array of shape (n_test, n_param) whose i,j element is the j'th parameter value in the i'th combination of testing parameters. + n_init : int = 0 # The number of combinations of parameters in the training set??? + test_grid_sizes : list[int] = [] # A list whose i'th element is the number of different values of the i'th parameter in the test instances. + test_meshgrid : tuple[np.ndarray] = None + + + + def __init__(self, config : dict) -> None: + """ + Initializes a ParameterSpace object using the settings passed in the conf dictionary (which + should have been read from a yaml file). + + + + ------------------------------------------------------------------------------------------- + Arguments + ------------------------------------------------------------------------------------------- + + config: This is a dictionary that houses the settings we want to use to run the code. This + should have been read from a yaml file. We assume it contains the following keys. If one + or more keys are tabbed over relative to one key above them, then the one above is a + dictionary and the ones below should be keys within that dictionary. + - parameter_space + - parameters (this should have at least one parameter defined!) + - test_space + - type (should be "grid") + """ + + # Make sure the configuration dictionary has a "parameter_space" setting. This should house + # information about which variables are present in the code, as well as how we want to test + # the various possible parameter values. + assert('parameter_space' in config); + + # Load the parameter_space settings into an InputParser object (which makes it easy to + # fetch setting values). We then fetch the list of parameters. Each parameters has a name, + # min and max, and information on how many instances we want. + parser = InputParser(config['parameter_space'], name = "param_space_input") + self.param_list : list[dict] = parser.getInput(['parameters'], datatype = list) + + # Fetch the parameter names. + self.param_name : list[str] = [] for param in self.param_list: - self.param_name += [param['name']] + self.param_name += [param['name']]; - self.train_space = self.createInitialTrainSpace(self.param_list) - self.n_init = self.train_space.shape[0] + # First, let's fetch the set of possible parameter values. This yields a 2^k x k matrix, + # where k is the number of parameters. The i,j entry of this matrix gives the value of the + # j'th parameter on the i'th instance. + self.train_space = self.createInitialTrainSpace(self.param_list) + self.n_init = self.train_space.shape[0] - test_space_type = parser.getInput(['test_space', 'type'], datatype=str) + # Next, let's make a set of possible parameter values to test at. + test_space_type = parser.getInput(['test_space', 'type'], datatype = str) if (test_space_type == 'grid'): + # Generate the set possible parameter combinations. See the docstring for + # "createTestGridSpace" for details. self.test_grid_sizes, self.test_meshgrid, self.test_space = self.createTestGridSpace(self.param_list) + # All done! return - def n_train(self): + + + def n_train(self) -> int: + """ + Returns the number of combinations of parameters in the training set. + """ + return self.train_space.shape[0] - def n_test(self): + + + def n_test(self) -> int: + """ + Returns the number of combinations of parameters in the testing set. + """ + return self.test_space.shape[0] - - def createInitialTrainSpace(self, param_list): - paramRanges = [] - for param in param_list: - minval = param['min'] - maxval = param['max'] + + + def createInitialTrainSpace(self, param_list : list[dict]) -> np.ndarray: + """ + Sets up a grid of parameter values to train at. Note that we only use the min and max value + of each parameter when setting up this grid. + + + ------------------------------------------------------------------------------------------- + Arguments + ------------------------------------------------------------------------------------------- + + param_list: A list of parameter dictionaries. Each entry should be a dictionary with the + following keys: + - name + - min + - max + + + ------------------------------------------------------------------------------------------- + Returns + ------------------------------------------------------------------------------------------- + + A 2d array of shape ((2)^k, k), where k is the number of parameters (k == len(param_list)). + The i'th column is the flattened i'th mesh_grid array we when we create a mesh grid using + the min and max value of each parameter as the argument. See "createHyperMeshGrid" for + details. + + Specifically, we return exactly what "createHyperGridSpace" returns. See the doc-string + for that function for further details. + """ + + # We need to know the min and max value for each parameter to set up the grid of possible + # parameter values. + paramRanges : list[np.ndarray] = [] + + for param in param_list: + # Fetch the min, max value of the current parameter. + minval : float = param['min'] + maxval : float = param['max'] + + # Store these values in an array and add them to the list. paramRanges += [np.array([minval, maxval])] - mesh_grids = self.createHyperMeshGrid(paramRanges) + # Use the ranges to set up a grid of possible parameter values. + mesh_grids : tuple[np.ndarray] = self.createHyperMeshGrid(paramRanges) + + # Now combine these grids into a 2d return self.createHyperGridSpace(mesh_grids) - def createTestGridSpace(self, param_list): - paramRanges = [] - gridSizes = [] + + def createTestGridSpace(self, param_list : list[dict]) -> tuple[list[int], tuple[np.ndarray], np.ndarray]: + """ + This function sets up a grid of parameter values to test at. + + + ------------------------------------------------------------------------------------------- + Arguments + ------------------------------------------------------------------------------------------- + + param_list: A list of parameter dictionaries. Each dictionary should either use the + "uniform" or "list" format. See create_uniform_1dspace and get_1dspace_from_list, + respectively. + + + ------------------------------------------------------------------------------------------- + Returns + ------------------------------------------------------------------------------------------- + + A three element tuple. + + The first is a list whose i'th element specifies the number of distinct values of the i'th + parameter we consider (this is the length of the i'th element of "paramRanges" below). + + The second is a a tuple of k numpy ndarrays (where k = len(param_list)), the i'th one of + which is a k-dimensional array with shape (N0, ... , N{k - 1}), where Ni = + param_list[i].size whose i(0), ... , i(k - 1) element specifies the value of the i'th + parameter in the i(0), ... , i(k - 1)'th unique combination of parameter values. + + The third one is a 2d array of parameter values. It has shape (M, k), where + M = \prod_{i = 0}^{k - 1} param_list[i].size. + """ + + # Set up arrays to hold the parameter values + number of parameter values for each + # parameter. + paramRanges : np.ndarray = [] + gridSizes : list[int] = [] + + # Cycle through the parameters for param in param_list: - Nx, paramRange = getParam1DSpace[param['test_space_type']](param) - gridSizes += [Nx] - paramRanges += [paramRange] + # Fetch the set of possible parameter values (paramRange) + the size of this set (Nx) + Nx, paramRange = getParam1DSpace[param['test_space_type']](param) + + # Add Nx, ParamRange to their corresponding lists + gridSizes += [Nx] + paramRanges += [paramRange] - mesh_grids = self.createHyperMeshGrid(paramRanges) + # Now that we have the set of parameter values for each parameter, turn it into a grid. + mesh_grids : tuple[np.ndarray] = self.createHyperMeshGrid(paramRanges) + + # All done. Return a list specifying the number of possible values for each parameter, the + # grids of parameter values, and the flattened/2d version of it. return gridSizes, mesh_grids, self.createHyperGridSpace(mesh_grids) + + def getParameter(self, param_vector): ''' convert numpy array parameter vector to a dict. @@ -91,63 +318,210 @@ def getParameter(self, param_vector): return param - def createHyperMeshGrid(self, param_ranges): + + + def createHyperMeshGrid(self, param_ranges : list[np.ndarray]) -> tuple[np.ndarray]: ''' - param_ranges: list of numpy 1d arrays, each corresponding to 1d parameter grid space. - The list size is equal to the number of parameters. - - Output: paramSpaces - - tuple of numpy nd arrays, corresponding to each parameter. - Dimension of the array equals to the number of parameters + This function generates arrays of parameter values. Specifically, if there are k + parameters (param_ranges has k elements), then we return k k-d arrays, the i'th one of + which is a k-dimensional array whose i(0), ... , i(k - 1) element specifies the value of + the i'th variable in the i(0), ... , i(k - 1)'th unique combination of parameter values. + + + ------------------------------------------------------------------------------------------- + Arguments + ------------------------------------------------------------------------------------------- + + param_ranges: list of numpy 1d arrays, each corresponding to 1d parameter grid space. The + i'th element of this list should be a 2-element numpy.ndarray object housing the max and + min value for the i'th parameter. The list size should equal the number of parameters. + + + ------------------------------------------------------------------------------------------- + Returns + ------------------------------------------------------------------------------------------- + + the "paramSpaces" tuple. This is a tuple of numpy ndarray objects, the i'th one of which + gives the grid of parameter values for the i'th parameter. Specifically, if there are + k parameters and if param_range[i].size = Ni, then the j'th return array has shape + (N0, ... , N{k - 1}) and the i(0), ... , i(k - 1) element of this array houses the i(j)'th + value of the j'th parameter. + + Thus, if there are k parameters, the returned tuple has k elements, each one of + which is an array of shape (N0, ... , N{k - 1}). ''' - args = () + + # Fetch the ranges, add them to a tuple (this is what the meshgrid function needs). + args : tuple[np.ndarray] = () for paramRange in param_ranges: args += (paramRange,) - paramSpaces = np.meshgrid(*args, indexing='ij') + # Use numpy's meshgrid function to generate the grids of parameter values. + paramSpaces : tuple[np.ndarray] = np.meshgrid(*args, indexing='ij') + + # All done! return paramSpaces - def createHyperGridSpace(self, mesh_grids): + + + def createHyperGridSpace(self, mesh_grids : tuple[np.ndarray]) -> np.ndarray: ''' - mesh_grids: tuple of numpy nd arrays, corresponding to each parameter. - Dimension of the array equals to the number of parameters - - Output: param_grid - - numpy 2d array of size (grid size x number of parameters). + Flattens the mesh_grid numpy.ndarray objects returned by createHyperMeshGrid and combines + them into a single 2d array of shape (grid size, number of parameters) (see below). + + + ------------------------------------------------------------------------------------------- + Arguments + ------------------------------------------------------------------------------------------- - grid size is the size of a numpy nd array. + mesh_grids: tuple of numpy nd arrays, corresponding to each parameter. This should ALWAYS + be the output of the "CreateHyperMeshGrid" function. See the return section of that + function's docstring for details. + + + ------------------------------------------------------------------------------------------- + Returns + ------------------------------------------------------------------------------------------- + + The param_grid. This is a 2d numpy.ndarray object of shape (grid size, number of + parameters). If each element of mesh_grids is a numpy.ndarray object of shape (N(1), ... , + N(k)) (k parameters), then (grid size) = N(1)*N(2)*...*N(k) and (number of parameters) = k. ''' - param_grid = None + + # For each parameter, we flatten its mesh_grid into a 1d array (of length (grid size)). We + # horizontally stack these flattened grids to get the final param_grid array. + param_grid : np.ndarray = None for k, paramSpace in enumerate(mesh_grids): + # Special treatment for the first parameter to initialize param_grid if (k == 0): - param_grid = paramSpace.reshape(-1, 1) + param_grid : np.ndarray = paramSpace.reshape(-1, 1) continue - param_grid = np.hstack((param_grid, paramSpace.reshape(-1, 1))) + # Flatten the new mesh grid and append it to the grid. + param_grid : np.ndarray = np.hstack((param_grid, paramSpace.reshape(-1, 1))) + # All done! return param_grid - def appendTrainSpace(self, param): + + + def appendTrainSpace(self, param : np.ndarray) -> None: + """ + Adds a new parameter to self's train space attribute. + + + ------------------------------------------------------------------------------------------- + Arguments + ------------------------------------------------------------------------------------------- + + param: A 1d numpy ndarray object. It should have shape (n_param) and should hold a + parameter value that we want to add to the training set. + + + + ------------------------------------------------------------------------------------------- + Returns + ------------------------------------------------------------------------------------------- + + Nothing! + """ + + # Make sure param has n_param components/can be appended to the set of training parameters. assert(self.train_space.shape[1] == param.size) - self.train_space = np.vstack((self.train_space, param)) + # Add the new parameter to the training space by appending it as a new row to + # self.train_space + self.train_space : np.ndarray = np.vstack((self.train_space, param)) + + # All done! return - def export(self): - dict_ = {'train_space': self.train_space, - 'test_space': self.test_space, - 'test_grid_sizes': self.test_grid_sizes, - 'test_meshgrid': self.test_meshgrid, - 'n_init': self.n_init} + + + def export(self) -> dict: + """ + This function packages the testing/training examples into a dictionary, which it returns. + + + ------------------------------------------------------------------------------------------- + Arguments + ------------------------------------------------------------------------------------------- + + None! + + ------------------------------------------------------------------------------------------- + Returns + ------------------------------------------------------------------------------------------- + + A dictionary with 4 keys. Below is a list of the keys with a short description of each + corresponding value. + train_space: self.train_space, a 2d array of shape (n_train, n_param) whose i,j element + holds the value of the j'th parameter in the i'th training case. + + test_space: self.test_space, a 2d array of shape (n_test, n_param) whose i,j element + holds the value of the j'th parameter in the i'th testing case. + + test_grid_sizes: A list whose i'th element specifies how many distinct parameter values + we use for the i'th parameter. + + test_meshgrid: a tuple of n_param numpy.ndarray array objects whose i'th element is a + n_param-dimensional array whose i(1), i(2), ... , i(n_param) element holds the value of + the i'th parameter in the i(1), ... , i(n_param) combination of parameter values in the + testing test. + + n_init: The number of combinations of training parameters in the training set. + """ + + # Build the dictionary + dict_ = {'train_space' : self.train_space, + 'test_space' : self.test_space, + 'test_grid_sizes' : self.test_grid_sizes, + 'test_meshgrid' : self.test_meshgrid, + 'n_init' : self.n_init} + + # All done! return dict_ - def load(self, dict_): - self.train_space = dict_['train_space'] - self.test_space = dict_['test_space'] - self.test_grid_sizes = dict_['test_grid_sizes'] - self.test_meshgrid = dict_['test_meshgrid'] - - assert(self.n_init == dict_['n_init']) - assert(self.train_space.shape[1] == self.n_param) - assert(self.test_space.shape[1] == self.n_param) + + + def load(self, dict_ : dict) -> None: + """ + This function builds a parameter space object from a dictionary. This dictionary should + be one that was returned by the export method, or a loaded copy of a dictionary that was + returned by the export method. + + + ------------------------------------------------------------------------------------------- + Arguments + ------------------------------------------------------------------------------------------- + + dict_: This should be a dictionary with the following keys: + - train_space + - test_space + - test_grid_sizes + - test_meshgrid + - n_init + This dictionary should have been returned by the export method. We use the values in this + dictionary to set up self. + + + ------------------------------------------------------------------------------------------- + Returns + ------------------------------------------------------------------------------------------- + + Nothing! + """ + + # Extract information from the dictionary. + self.train_space : np.ndarray = dict_['train_space'] + self.test_space : np.ndarray = dict_['test_space'] + self.test_grid_sizes : list[int] = dict_['test_grid_sizes'] + self.test_meshgrid : tuple[np.ndarray] = dict_['test_meshgrid'] + + # Run checks + assert(self.n_init == dict_['n_init']) + assert(self.train_space.shape[1] == self.n_param) + assert(self.test_space.shape[1] == self.n_param) + + # All done! return diff --git a/src/lasdi/physics/burgers1d.py b/src/lasdi/physics/burgers1d.py index ad2dd32..f7c3bdc 100644 --- a/src/lasdi/physics/burgers1d.py +++ b/src/lasdi/physics/burgers1d.py @@ -21,9 +21,10 @@ def __init__(self, cfg, param_name=None): self.offline = parser.getInput(['offline_driver'], fallback=False) - self.nt = parser.getInput(['number_of_timesteps'], datatype=int) - self.grid_size = parser.getInput(['grid_size'], datatype=list) - self.qgrid_size = self.grid_size + self.nt : int = parser.getInput(['number_of_timesteps'], datatype = int) + self.grid_size : list[int] = parser.getInput(['grid_size'], datatype = list) + self.qgrid_size : list[int] = self.grid_size + assert(self.dim == len(self.grid_size)) self.xmin = parser.getInput(['xmin'], datatype=float) diff --git a/src/lasdi/timing.py b/src/lasdi/timing.py index 978045a..f36388b 100644 --- a/src/lasdi/timing.py +++ b/src/lasdi/timing.py @@ -1,135 +1,210 @@ -""" - -.. _NumPy docstring standard: - https://numpydoc.readthedocs.io/en/latest/format.html#docstring-standard - -""" +# ------------------------------------------------------------------------------------------------- +# Imports +# ------------------------------------------------------------------------------------------------- from time import perf_counter -class Timer: - """A light-weight timer class. - """ - def __init__(self): - - self.names = {} - """:obj:`dict(str:int)`: Dictionary that maps job names to job indices.""" - self.calls = [] - """:obj:`list(int)`: List that stores the number of calls for each job.""" +# ------------------------------------------------------------------------------------------------- +# Timer class +# ------------------------------------------------------------------------------------------------- - self.times = [] - """:obj:`list(float)`: List that stores the total time for each job.""" - - self.starts = [] - """ - :obj:`list(float)`: List that stores the start time for each job. - If the job is not running, :obj:`None` is stored instead. - """ +class Timer: + def __init__(self): + # Set instance variables + self.names : dict = {} # Dictionary of named timers. key = name, value = index + self.calls : list = [] # k'th element = # of times we have called the k'th timer + self.times : list = [] # k'th element = total time recorded by the k'th timer + self.starts : list = [] # k'th element = start time for the k'th timer (if running) return + - def start(self, name): - """Start a job named :obj:`name`. - If the job is not listed, register the job in the job list. - Args: - name (:obj:`str`): Name of the job to be started. + def start(self, name : str) -> None: + """ + Starts a specific timer. The user must specify the name of the timer they want to start. + The specified timer can not already be running! + + + ------------------------------------------------------------------------------------------- + Arguments + ------------------------------------------------------------------------------------------- + + name: A string specifying the name of the timer you want to start. - Note: - The job must not have started before calling this method. - Returns: - Does not return a value. + ------------------------------------------------------------------------------------------- + Returns + ------------------------------------------------------------------------------------------- + Nothing! """ + # If the timer does not already exist, initialize it if name not in self.names: + # If this is the k'th timer that we have added to self (which would be the case if + # self.names has k keys), then we store that information in the "names" dictionary. + # We then store other information about that timer (such as its current value) in the + # k'th element of the calls/times/starts lists. self.names[name] = len(self.names) - self.calls += [0] - self.times += [0.0] - self.starts += [None] + self.calls += [0] + self.times += [0.0] + self.starts += [None] - idx = self.names[name] - # check if the job is already being measured + # Fetch the current timer's number. + idx : int = self.names[name] + + # Make sure the current timer has not already started. If so, it is already running and we + # need to raise an exception. if (self.starts[idx] is not None): raise RuntimeError("Timer.start: %s timer is already ticking!" % name) + + # Set the current timer's start element to the time when we started this timer. self.starts[idx] = perf_counter() + + # All done! return - def end(self, name): - """End a job named :obj:`name`. - Increase the number of calls and the runtime for the job. - Args: - name (:obj:`str`): Name of the job to be ended. - Note: - The job must have started before calling this method. + def end(self, name : str) -> None: + """ + Stops a specific timer. The user must specify the name of the timer they want to stop. + + + ------------------------------------------------------------------------------------------- + Arguments + ------------------------------------------------------------------------------------------- - Returns: - Does not return a value. + name: A string specifying the name of the timer you want to stop. + + ------------------------------------------------------------------------------------------- + Returns + ------------------------------------------------------------------------------------------- + + Nothing! """ + + # Make sure the requested timer actually exists. assert(name in self.names) + # Fetch the requested timers' index. idx = self.names[name] - # check if the job has started. + + # Make sure the requested timer is actually running. if (self.starts[idx] is None): raise RuntimeError("Timer.end: %s start time is not measured yet!" % name) + # Record the time since the current timer started, add it to the running total for this + # timer. Also increment the number of calls to this timer + reset it's start value to + # None (so we can start it again). self.times[idx] += perf_counter() - self.starts[idx] self.calls[idx] += 1 self.starts[idx] = None + + # All done! return - def print(self): - """Print the list of jobs and their number of calls, total time and time per each call. - Returns: - Does not return a value. - """ + def print(self) -> None: + """ + This function reports information on every timer in self. It has no arguments and returns + nothing. + """ + + # Header print("Function name\tCalls\tTotal time\tTime/call\n") + + # Cycle through timers. for name, idx in self.names.items(): print("%s\t%d\t%.3e\t%.3e\n" % (name, self.calls[idx], self.times[idx], self.times[idx] / self.calls[idx])) + + # All done! return - def export(self): - """Export the list of jobs and their number of calls and total time - into a dictionary. - Note: - All jobs must be ended before calling this method. - Returns: - :obj:`dict` that contains "names", "calls", and "times" as keys + def export(self) -> dict: """ + This function extracts the names, calls, and times attributes of self, stores them in a + dictionary, and then returns that dictionary. If you have another dictionary object, + you can passed the returned dictionary to that object's load method to make that object + into an identical copy of self. + + + ------------------------------------------------------------------------------------------- + Arguments + ------------------------------------------------------------------------------------------- + + None! + + + ------------------------------------------------------------------------------------------- + Returns + ------------------------------------------------------------------------------------------- + + A dictionary housing the names, calls, and times attributes of self. The returned + dictionary has three keys: + - names + - calls + - times + + names is a dictionary with string keys whose corresponding values are integer indexes. If a + particular timer was the k'th one added to self, then it's value in names will be k. + + calls is a list whose k'th element specifies how many times the k'th timer was stopped. + times is a list whose k'th element specifies the total time recorded on the k'th timer. + """ + + # Make sure that no timers are currently running. for start in self.starts: if (start is not None): raise RuntimeError('Timer.export: cannot export while Timer is still ticking!') - param_dict = {} + # Set up a dictionary to house the timer information. + param_dict : dict = {} + + # Store names, calls, and timers (but not starts... we don't need that) in the dictionary. param_dict["names"] = self.names param_dict["calls"] = self.calls param_dict["times"] = self.times - return param_dict - - def load(self, dict_): - """Load the list of jobs and their number of calls and total time - from a dictionary. - Args: - `dict_` (:obj:`dict`): Dictionary that contains the list of jobs and their calls and times. + # All done! + return param_dict + - Note: - :obj:`dict_['names']`, :obj:`dict_['calls']` and :obj:`dict_['times']` must have the same size. - Returns: - Does not return a value + def load(self, dict_ : dict) -> None: """ + This function de-serializes a timer object, making self into an identical copy of a + previously serialized timer object. Specifically, we replace self's names, calls, and + times attributes using those in the passed dict_. We use this function to restore a + timer object's state after loading from a checkpoint. + + ------------------------------------------------------------------------------------------- + Arguments + ------------------------------------------------------------------------------------------- + + dict_: This should be a dictionary with three keys: + - names + - calls + - times + the corresponding values should be the names, calls, and times attributes of another timer + object, respectively. We replace self's attributes with those the values in dict_. dict_ + should be the dictionary returned by calling export on a timer object. + + + ------------------------------------------------------------------------------------------- + Returns + ------------------------------------------------------------------------------------------- + + Nothing! + """ self.names = dict_['names'] self.calls = dict_['calls'] self.times = dict_['times']