# License: MIT
from .base import BaseEmbed
from ..utils.utils import check_Xs
import numpy as np
from scipy import linalg, stats
from scipy.sparse.linalg import svds
from sklearn.preprocessing import normalize
from joblib import Parallel, delayed
from .utils import select_dimension
import warnings
def center(X):
r"""
Subtracts the row means and divides by the row standard deviations.
Then subtracts column means.
Parameters
----------
X : array-like, shape (n_observations, n_features)
The data to preprocess
Returns
-------
centered_X : preprocessed data matrix
"""
# Mean along rows using sample mean and sample std
centered_X = stats.zscore(X, axis=1, ddof=1)
# Mean along columns
mu = np.mean(centered_X, axis=0)
centered_X -= mu
return centered_X
[docs]class GCCA(BaseEmbed):
r"""
An implementation of Generalized Canonical Correlation Analysis [#1GCCA]_
suitable for cases where the number of features exceeds the number of
samples by first applying single view dimensionality reduction. Computes
individual projections into a common subspace such that the correlations
between pairwise projections are minimized (ie. maximize pairwise
correlation). An important note: this is applicable to any number of
views, not just two.
Parameters
----------
n_components : int (positive), optional, default=None
If ``self.sv_tolerance=None``, selects the number of SVD
components to keep for each view. If none, another selection
method is used.
fraction_var : float, default=None
If ``self.sv_tolerance=None``, and ``self.n_components=None``,
selects the number of SVD components to keep for each view by
capturing enough of the variance. If none, another selection
method is used.
sv_tolerance : float, optional, default=None
Selects the number of SVD components to keep for each view by
thresholding singular values. If none, another selection
method is used.
n_elbows : int, optional, default: 2
If ``self.fraction_var=None``, ``self.sv_tolerance=None``, and
``self.n_components=None``, then compute the optimal embedding
dimension using :func:`.utils.select_dimension`.
Otherwise, ignored.
tall : boolean, default=False
Set to true if n_samples > n_features, speeds up SVD
max_rank : boolean, default=False
If true, sets the rank of the common latent space as the maximum rank
of the individual spaces. If false, uses the minimum individual rank.
n_jobs : int (positive), default=None
The number of jobs to run in parallel when computing the SVDs for each
view in `fit` and `partial_fit`. `None` means 1 job, `-1` means using
all processors.
Attributes
----------
projection_mats_ : list of arrays
A projection matrix for each view, from the given space to the
latent space
ranks_ : list of ints
Number of left singular vectors kept for each view during the first
SVD
Notes
-----
Consider two views :math:`X_1` and :math:`X_2`. Canonical Correlation
Analysis seeks to find vectors :math:`a_1` and :math:`a_2` to maximize
the correlation :math:`X_1 a_1` and :math:`X_2 a_2`, expanded below.
.. math::
\left(\frac{a_1^TC_{12}a_2}
{\sqrt{a_1^TC_{11}a_1a_2^TC_{22}a_2}}
\right)
where :math:`C_{11}`, :math:`C_{22}`, and :math:`C_{12}` are respectively
the view 1, view 2, and between view covariance matrix estimates. GCCA
maximizes the sum of these correlations across all pairwise views and
computes a set of linearly independent components. This specific algorithm
first applies principal component analysis (PCA) independently to each view
and then aligns the most informative projections to find correlated and
informative subspaces. Parameters that control the embedding dimension
apply to the PCA step. The dimension of each aligned subspace is the
maximum or minimum of the individual dimensions, per the `max_ranks`
parameter. Using the maximum will capture the most information from all
views but also noise from some views. Using the minimum will better remove
noise dimensions but at the cost of information from some views.
References
----------
.. [#1GCCA] B. Afshin-Pour, G.A. Hossein-Zadeh, S.C. Strother, H.
Soltanian-Zadeh. "Enhancing reproducibility of fMRI statistical
maps using generalized canonical correlation analysis in NPAIRS
framework." Neuroimage, volume 60, pp. 1970-1981, 2012
Examples
--------
>>> from mvlearn.datasets import load_UCImultifeature
>>> from mvlearn.embed import GCCA
>>> # Load full dataset, labels not needed
>>> Xs, _ = load_UCImultifeature()
>>> gcca = GCCA(fraction_var = 0.9)
>>> # Transform the first 5 views
>>> Xs_latents = gcca.fit_transform(Xs[:5])
>>> print([X.shape[1] for X in Xs_latents])
[9, 9, 9, 9, 9]
"""
def __init__(
self,
n_components=None,
fraction_var=None,
sv_tolerance=None,
n_elbows=2,
tall=False,
max_rank=False,
n_jobs=None,
):
self.n_components = n_components
self.fraction_var = fraction_var
self.sv_tolerance = sv_tolerance
self.n_elbows = n_elbows
self.tall = tall
self.projection_mats_ = None
self.ranks_ = None
self.max_rank = max_rank
self.n_jobs = n_jobs
def fit(self, Xs, y=None):
r"""
Calculates a projection from each view to a latent space such that
the sum of pairwise latent space correlations is maximized. Each view
'X' is normalized and the left singular vectors of 'X^T X' are
calculated using SVD. The number of singular vectors kept is determined
by either the percent variance explained, a given rank threshold, or a
given number of components. The singular vectors kept are concatenated
and SVD of that is taken and used to calculated projections for each
view.
Parameters
----------
Xs : list of array-likes or numpy.ndarray
- Xs length: n_views
- Xs[i] shape: (n_samples, n_features_i)
The data to fit to. Each view will receive its own embedding.
y : ignored
Included for API compliance.
Returns
-------
self : returns an instance of self.
"""
Xs = check_Xs(Xs, multiview=True)
n = Xs[0].shape[0]
min_m = min(X.shape[1] for X in Xs)
# Parallel center
Xs = Parallel(n_jobs=self.n_jobs)(
delayed(center)(X) for X in Xs
)
# Parallel SVDs
usvr = Parallel(n_jobs=self.n_jobs)(
delayed(self._fit_view)(X, n, min_m) for X in Xs
)
# Reshape from parallel output
self._Uall, self._Sall, self._Vall, self.ranks_ = zip(*usvr)
self = self._fit_multistep()
return self
def partial_fit(self, Xs, reset=False, multiview_step=True):
r"""
Performs like `fit`, but will not overwrite previously fitted single
views and instead uses them as well as the new data. Useful if the data
needs to be processed in batches.
Parameters
----------
Xs : list of array-likes or numpy.ndarray
- Xs length: n_views
- Xs[i] shape: (n_samples, n_features_i)
The data to fit to. Each view will receive its own embedding.
reset : boolean (default = False)
If True, overwrites all prior computations.
multiview_step : boolean, (default = True)
If True, performs the joint SVD step on the results from individual
views. Must be set to True in the final call.
Returns
-------
self : returns an instance of self.
"""
if not hasattr(self, '_Uall') or reset:
self._Uall = []
self._Sall = []
self._Vall = []
self.ranks_ = []
Xs = check_Xs(Xs, multiview=False)
n = Xs[0].shape[0]
min_m = min(X.shape[1] for X in Xs)
# Parallel center
Xs = Parallel(n_jobs=self.n_jobs)(
delayed(center)(X) for X in Xs
)
# Parallel SVDs
usvr = Parallel(n_jobs=self.n_jobs)(
delayed(self._fit_view)(X, n, min_m) for X in Xs
)
# Reshape and concatenate from parallel output
u, s, v, r = zip(*usvr)
self._Uall += u
self._Sall += s
self._Vall += v
self.ranks_ += r
if multiview_step:
if len(self.ranks_) < 2:
msg = "Fewer than two single views fitted. Unable to perform \
multiview step."
warnings.warn(msg, UserWarning)
else:
self = self._fit_multistep()
return self
def _fit_view(self, X, n, min_m):
"""
Helper function to compute SVD on each view.
"""
# Preprocess
X[np.isnan(X)] = 0
# compute the SVD of the data
if self.tall:
v, s, ut = linalg.svd(X.T, full_matrices=False)
else:
u, s, vt = linalg.svd(X, full_matrices=False)
ut = u.T
v = vt.T
# Dimensions to reduce to
if self.sv_tolerance:
if not isinstance(self.sv_tolerance, float) and not isinstance(
self.sv_tolerance, int
):
raise TypeError("sv_tolerance must be numeric")
elif self.sv_tolerance <= 0:
raise ValueError(
"sv_tolerance must be greater than 0"
)
rank = sum(s > self.sv_tolerance)
elif self.n_components:
if not isinstance(self.n_components, int):
raise TypeError("n_components must be an integer")
elif self.n_components <= 0:
raise ValueError(
"n_components must be greater than 0"
)
elif self.n_components > min((n, min_m)):
raise ValueError(
"n_components must be less than or equal to the \
minimum input rank"
)
rank = self.n_components
elif self.fraction_var:
if not isinstance(self.fraction_var, float) and not isinstance(
self.fraction_var, int
):
raise TypeError(
"fraction_var must be an integer or float"
)
elif self.fraction_var <= 0 or self.fraction_var > 1:
raise ValueError("fraction_var must be in (0,1]")
s2 = np.square(s)
rank = sum(np.cumsum(s2 / sum(s2)) < self.fraction_var) + 1
else:
# Sweep over only first log2, else too large elbows
s = s[: int(np.ceil(np.log2(np.min(X.shape))))]
elbows, _ = select_dimension(
s, n_elbows=self.n_elbows, threshold=None
)
rank = elbows[-1]
u = ut.T[:, :rank]
return u, s, v, rank
def _fit_multistep(self):
"""
Helper function to compute the SVD on the results from individuals
view SVDs.
"""
if self.max_rank:
d = max(self.ranks_)
else:
d = min(self.ranks_)
# Create a concatenated view of Us
Uall_c = np.concatenate(self._Uall, axis=1)
_, _, VV = svds(Uall_c, d)
VV = np.flip(VV.T, axis=1)
VV = VV[:, : min([d, VV.shape[1]])]
# SVDS the concatenated Us
idx_end = 0
projection_mats = []
n = len(self.ranks_)
for i in range(n):
idx_start = idx_end
idx_end = idx_start + self.ranks_[i]
VVi = normalize(VV[idx_start:idx_end, :], "l2", axis=0)
# Compute the canonical projections
A = np.sqrt(n - 1) * self._Vall[i][:, : self.ranks_[i]]
A = A @ (linalg.solve(
np.diag(self._Sall[i][: self.ranks_[i]]), VVi
))
projection_mats.append(A)
self.projection_mats_ = projection_mats
return self
def transform(self, Xs, view_idx=None):
r"""
Embeds data matrix(s) using the fitted projection matrices. May be
used for out-of-sample embeddings.
Parameters
----------
Xs : list of array-likes or numpy.ndarray
- Xs length: n_views
- Xs[i] shape: (n_samples, n_features_i)
A list of data matrices from each view to transform based on the
prior fit function. If view_idx is defined, then Xs is a 2D data
matrix corresponding to a single view.
view_idx : int, default=None
For transformation of a single view. If not None, then Xs is 2D
and views_idx specifies the index of the view from which Xs comes
from.
Returns
-------
Xs_transformed : list of array-likes or array-like
Same shape as Xs
"""
if self.projection_mats_ is None:
raise RuntimeError("Must call fit function before transform")
Xs = check_Xs(Xs)
if view_idx is not None:
return center(Xs[0]) @ self.projection_mats_[view_idx]
else:
return np.array(
[
center(X) @ proj
for X, proj in zip(Xs, self.projection_mats_)
]
)