diff --git a/cca_zoo/deep/architectures.py b/cca_zoo/deep/architectures.py index 78444c14..99222764 100644 --- a/cca_zoo/deep/architectures.py +++ b/cca_zoo/deep/architectures.py @@ -1,4 +1,3 @@ -from abc import abstractmethod from math import sqrt from typing import Iterable @@ -6,35 +5,28 @@ from torch import nn -class _BaseEncoder(torch.nn.Module): - @abstractmethod +class BaseEncoder(nn.Module): def __init__(self, latent_dimensions: int, variational: bool = False): - super(_BaseEncoder, self).__init__() - self.variational = variational + super(BaseEncoder, self).__init__() self.latent_dimensions = latent_dimensions + self.variational = variational - @abstractmethod - def forward(self, x): - pass - - -class _BaseDecoder(torch.nn.Module): - @abstractmethod - def __init__(self, latent_dimensions: int): - super(_BaseDecoder, self).__init__() - self.latent_dimensions = latent_dimensions + def create_variational_layers(self, in_features): + self.fc_mu = torch.nn.Linear(in_features, self.latent_dimensions) + self.fc_var = torch.nn.Linear(in_features, self.latent_dimensions) - @abstractmethod - def forward(self, x): - pass + def forward_variational(self, x): + mu = self.fc_mu(x) + logvar = self.fc_var(x) + return mu, logvar -class Encoder(_BaseEncoder): +class Encoder(BaseEncoder): def __init__( self, latent_dimensions: int, + feature_size: int, variational: bool = False, - feature_size: int = 784, layer_sizes: tuple = None, activation=nn.LeakyReLU(), dropout=0, @@ -43,244 +35,201 @@ def __init__( if layer_sizes is None: layer_sizes = (128,) layer_sizes = (feature_size,) + layer_sizes + (latent_dimensions,) - layers = [] - # other layers - for l_id in range(len(layer_sizes) - 2): - layers.append( - torch.nn.Sequential( - nn.Dropout(p=dropout), - torch.nn.Linear(layer_sizes[l_id], layer_sizes[l_id + 1]), - activation, - ) + layers = [ + nn.Sequential( + nn.Dropout(p=dropout), + nn.Linear(layer_sizes[i], layer_sizes[i + 1]), + activation, ) - self.layers = torch.nn.Sequential(*layers) + for i in range(len(layer_sizes) - 2) + ] + self.layers = nn.Sequential(*layers) if self.variational: - self.fc_mu = torch.nn.Sequential( - nn.Dropout(p=dropout), torch.nn.Linear(layer_sizes[-2], layer_sizes[-1]) - ) - self.fc_var = torch.nn.Sequential( - nn.Dropout(p=dropout), torch.nn.Linear(layer_sizes[-2], layer_sizes[-1]) - ) + self.create_variational_layers(layer_sizes[-2]) else: - self.fc = torch.nn.Sequential( - nn.Dropout(p=dropout), torch.nn.Linear(layer_sizes[-2], layer_sizes[-1]) - ) + self.fc = nn.Linear(layer_sizes[-2], layer_sizes[-1]) def forward(self, x): x = self.layers(x) if self.variational: - mu = self.fc_mu(x) - logvar = self.fc_var(x) - return mu, logvar - else: - x = self.fc(x) - return x + return self.forward_variational(x) + return self.fc(x) -class Decoder(_BaseDecoder): +class LinearEncoder(BaseEncoder): def __init__( - self, - latent_dimensions: int, - feature_size: int = 784, - layer_sizes: tuple = None, - activation=nn.LeakyReLU(), - dropout=0, + self, latent_dimensions: int, feature_size: int, variational: bool = False ): - super(Decoder, self).__init__(latent_dimensions) - if layer_sizes is None: - layer_sizes = (128,) - layer_sizes = (latent_dimensions,) + layer_sizes + (feature_size,) - layers = [] - for l_id in range(len(layer_sizes) - 1): - layers.append( - torch.nn.Sequential( - nn.Dropout(p=dropout), - torch.nn.Linear(layer_sizes[l_id], layer_sizes[l_id + 1]), - activation, - ) - ) - self.layers = torch.nn.Sequential(*layers) + super(LinearEncoder, self).__init__(latent_dimensions, variational=variational) + if self.variational: + self.create_variational_layers(feature_size) + else: + self.fc = torch.nn.Linear(feature_size, latent_dimensions) def forward(self, x): - x = self.layers(x) - return x + if self.variational: + return self.forward_variational(x) + return self.fc(x) -class CNNEncoder(_BaseEncoder): +class CNNEncoder(BaseEncoder): def __init__( self, latent_dimensions: int, + feature_size: Iterable, variational: bool = False, - feature_size: Iterable = (28, 28), channels: tuple = None, kernel_sizes: tuple = None, - stride: tuple = None, - padding: tuple = None, + strides: tuple = None, + paddings: tuple = None, activation=nn.LeakyReLU(), dropout=0, ): super(CNNEncoder, self).__init__(latent_dimensions, variational=variational) + + default_len = 2 if channels is None else len(channels) if channels is None: channels = (1, 1) - if kernel_sizes is None: - kernel_sizes = (5,) * (len(channels)) - if stride is None: - stride = (1,) * (len(channels)) - if padding is None: - padding = (2,) * (len(channels)) - # assume square input - conv_layers = [] - current_size = feature_size[0] + kernel_sizes = kernel_sizes or (5,) * default_len + strides = strides or (1,) * default_len + paddings = paddings or (2,) * default_len + + self.conv_layers = self._build_conv_layers( + channels, kernel_sizes, strides, paddings, activation + ) + + final_channels = channels[-1] + final_size = feature_size[0] # Assuming square input + linear_input_size = final_channels * final_size * final_size + + if self.variational: + self.create_variational_layers(linear_input_size) + else: + self.fc = nn.Sequential( + nn.Dropout(p=dropout), nn.Linear(linear_input_size, latent_dimensions) + ) + + def _build_conv_layers( + self, channels, kernel_sizes, strides, paddings, activation + ): + layers = [] current_channels = 1 - for l_id in range(len(channels) - 1): - conv_layers.append( - torch.nn.Sequential( - torch.nn.Conv2d( - in_channels=current_channels, # input height - out_channels=channels[l_id], # n_filters - kernel_size=kernel_sizes[l_id], # filter size - stride=stride[l_id], # filter movement/step - padding=padding[l_id], - # if want same width and length of this image after Conv2d, padding=(kernel_size-1)/2 if - # stride=1 - ), # output shape (out_channels, current_size, current_size) - activation, # activation + for idx in range(len(channels)): + layers.append( + nn.Sequential( + nn.Conv2d( + in_channels=current_channels, + out_channels=channels[idx], + kernel_size=kernel_sizes[idx], + stride=strides[idx], + padding=paddings[idx], + ), + activation, ) ) - current_size = current_size - current_channels = channels[l_id] + current_channels = channels[idx] + return nn.Sequential(*layers) + def forward(self, x): + x = self.conv_layers(x) + x = x.view(x.size(0), -1) if self.variational: - self.fc_mu = torch.nn.Sequential( - nn.Dropout(p=dropout), - torch.nn.Linear( - int(current_size * current_size * current_channels), - latent_dimensions, - ), - ) - self.fc_var = torch.nn.Sequential( - nn.Dropout(p=dropout), - torch.nn.Linear( - int(current_size * current_size * current_channels), - latent_dimensions, - ), - ) - else: - self.fc = torch.nn.Sequential( + return self.forward_variational(x) + return self.fc(x) + + +class Decoder(nn.Module): + def __init__( + self, + latent_dimensions: int, + feature_size: int = 784, + layer_sizes: tuple = None, + activation=nn.LeakyReLU(), + dropout=0, + ): + super(Decoder, self).__init__() + if layer_sizes is None: + layer_sizes = (128,) + layer_sizes = (latent_dimensions,) + layer_sizes + (feature_size,) + layers = [ + nn.Sequential( nn.Dropout(p=dropout), - torch.nn.Linear( - int(current_size * current_size * current_channels), - latent_dimensions, - ), + nn.Linear(layer_sizes[i], layer_sizes[i + 1]), + activation, ) - self.conv_layers = torch.nn.Sequential(*conv_layers) + for i in range(len(layer_sizes) - 1) + ] + self.layers = nn.Sequential(*layers) def forward(self, x): - x = self.conv_layers(x) - x = x.reshape((x.shape[0], -1)) - if self.variational: - mu = self.fc_mu(x) - logvar = self.fc_var(x) - return mu, logvar - else: - x = self.fc(x) - return x + return self.layers(x) -class CNNDecoder(_BaseDecoder): +class CNNDecoder(nn.Module): def __init__( self, latent_dimensions: int, feature_size: Iterable = (28, 28), channels: tuple = None, - kernel_sizes=None, - strides=None, - paddings=None, + kernel_sizes: tuple = None, + strides: tuple = None, + paddings: tuple = None, activation=nn.LeakyReLU(), dropout=0, ): - super(CNNDecoder, self).__init__(latent_dimensions) + super(CNNDecoder, self).__init__() + + default_len = 2 if channels is None else len(channels) if channels is None: channels = (1, 1) - if kernel_sizes is None: - kernel_sizes = (5,) * len(channels) - if strides is None: - strides = (1,) * len(channels) - if paddings is None: - paddings = (2,) * len(channels) - conv_layers = [] - current_channels = 1 - current_size = feature_size[0] - for l_id, (channel, kernel, stride, padding) in reversed( - list(enumerate(zip(channels, kernel_sizes, strides, paddings))) - ): - conv_layers.append( - torch.nn.Sequential( - torch.nn.ConvTranspose2d( - in_channels=channel, # input height - out_channels=current_channels, - kernel_size=kernel_sizes[l_id], - stride=strides[l_id], # filter movement/step - padding=paddings[l_id], - # if want same width and length of this image after Conv2d, padding=(kernel_size-1)/2 if - # stride=1 - ), - activation, - ) - ) - current_size = current_size - current_channels = channel + kernel_sizes = kernel_sizes or (5,) * default_len + strides = strides or (1,) * default_len + paddings = paddings or (2,) * default_len - # reverse layers as constructed in reverse - self.conv_layers = torch.nn.Sequential(*conv_layers[::-1]) - self.fc_layer = torch.nn.Sequential( + self.conv_layers = self._build_transpose_conv_layers( + feature_size, channels, kernel_sizes, strides, paddings, activation + ) + + self.fc_layer = nn.Sequential( nn.Dropout(p=dropout), - torch.nn.Linear( - latent_dimensions, int(current_size * current_size * current_channels) + nn.Linear( + latent_dimensions, channels[0] * feature_size[0] * feature_size[1] ), activation, ) - def forward(self, x): - x = self.fc_layer(x) - x = x.reshape((x.shape[0], self.conv_layers[0][0].in_channels, -1)) - x = x.reshape( - ( - x.shape[0], - self.conv_layers[0][0].in_channels, - int(sqrt(x.shape[-1])), - int(sqrt(x.shape[-1])), - ) - ) - x = self.conv_layers(x) - return x - - -class LinearEncoder(_BaseEncoder): - def __init__( - self, latent_dimensions: int, feature_size: int, variational: bool = False + def _build_transpose_conv_layers( + self, feature_size, channels, kernel_sizes, strides, paddings, activation ): - super(LinearEncoder, self).__init__(latent_dimensions, variational=variational) - self.variational = variational - - if self.variational: - self.fc_mu = torch.nn.Linear(feature_size, latent_dimensions) - self.fc_var = torch.nn.Linear(feature_size, latent_dimensions) - else: - self.fc = torch.nn.Linear(feature_size, latent_dimensions) + layers = [] + for idx in reversed(range(len(channels) - 1)): + layers.append( + nn.Sequential( + nn.ConvTranspose2d( + in_channels=channels[idx], + out_channels=channels[idx + 1], + kernel_size=kernel_sizes[idx], + stride=strides[idx], + padding=paddings[idx], + ), + activation, + ) + ) + return nn.Sequential(*layers) def forward(self, x): - if self.variational: - mu = self.fc_mu(x) - logvar = self.fc_var(x) - return mu, logvar - else: - x = self.fc(x) - return x + x = self.fc_layer(x) + x = x.view( + x.size(0), + self.conv_layers[0][0].in_channels, + int(sqrt(x.size(1))), + int(sqrt(x.size(1))), + ) + return self.conv_layers(x) -class LinearDecoder(_BaseDecoder): +class LinearDecoder(nn.Module): def __init__(self, latent_dimensions: int, feature_size: int): super(LinearDecoder, self).__init__(latent_dimensions) self.linear = torch.nn.Linear(latent_dimensions, feature_size) diff --git a/cca_zoo/deep/data.py b/cca_zoo/deep/data.py index 5f1f6cbd..5eea4354 100644 --- a/cca_zoo/deep/data.py +++ b/cca_zoo/deep/data.py @@ -103,11 +103,11 @@ def get_dataloaders( class DoubleNumpyDataset(NumpyDataset): - def __init__(self, views, batch_size=None): + def __init__(self, views, batch_size=None, random_state=None): super().__init__(views) self.views = [view.astype(np.float32) for view in views] self.batch_size = batch_size - self.random_state = check_random_state(0) + self.random_state = check_random_state(random_state) def __getitem__(self, index): views = [view[index] for view in self.views] diff --git a/cca_zoo/linear/_gradient/_ey.py b/cca_zoo/linear/_gradient/_ey.py index f6b37f15..dac9400f 100644 --- a/cca_zoo/linear/_gradient/_ey.py +++ b/cca_zoo/linear/_gradient/_ey.py @@ -56,9 +56,9 @@ def validation_step(self, batch, batch_idx): return loss["objective"] def get_dataset(self, views: Iterable[np.ndarray], validation_views=None): - dataset = DoubleNumpyDataset(views, batch_size=self.batch_size) + dataset = DoubleNumpyDataset(views, batch_size=self.batch_size, random_state=self.random_state) if validation_views is not None: - val_dataset = DoubleNumpyDataset(validation_views, self.batch_size) + val_dataset = DoubleNumpyDataset(validation_views, self.batch_size, self.random_state) else: val_dataset = None return dataset, val_dataset diff --git a/tests/test_common.py b/tests/test_common.py index 7fabf34e..d910b50e 100644 --- a/tests/test_common.py +++ b/tests/test_common.py @@ -60,7 +60,7 @@ def transform(self, views, y=None): [ GCCA(), CCA_EY(random_state=random_state), - CCA_GHA(random_state=random_state), + # CCA_GHA(random_state=random_state), PLS_EY(random_state=random_state), # PLSStochasticPower(random_state=random_state), GRCCA(),