Source code for mvlearn.embed.mcca

"""Multiview Canonical Correlation Analysis"""

# Authors: Iain Carmichael, Ronan Perry
# License: MIT

from numbers import Number
import numpy as np
from scipy.linalg import block_diag
from itertools import combinations
from warnings import warn
from sklearn.covariance import ledoit_wolf, oas
from sklearn.utils.validation import check_is_fitted
from sklearn.utils import check_array

from ..utils import check_Xs, param_as_list

from ..utils import eigh_wrapper, svd_wrapper
from .base import BaseCCA, _check_regs, _initial_svds, _deterministic_decomp
from ..compose import SimpleSplitter


[docs]class MCCA(BaseCCA): r""" Multiview canonical correlation analysis for any number of views. Includes options for regularized MCCA and informative MCCA (where a low rank PCA is first computed). Parameters ---------- n_components : int | 'min' | 'max' | None (default 1) Number of final components to compute. If `int`, will compute that many. If None, will compute as many as possible. 'min' and 'max' will respectively use the minimum/maximum number of features among views. regs : float | 'lw' | 'oas' | None, or list, optional (default None) MCCA regularization for each data view, which can be important for high dimensional data. A list will specify for each view separately. If float, must be between 0 and 1 (inclusive). - 0 or None : corresponds to SUMCORR-AVGVAR MCCA. - 1 : partial least squares SVD (generalizes to more than 2 views) - 'lw' : Default ``sklearn.covariance.ledoit_wolf`` regularization - 'oas' : Default ``sklearn.covariance.oas`` regularization signal_ranks : int, None or list, optional (default None) The initial signal rank to compute. If None, will compute the full SVD. A list will specify for each view separately. center : bool, or list (default True) Whether or not to initially mean center the data. A list will specify for each view separately. i_mcca_method : 'auto' | 'svd' | 'gevp' (default 'auto') Whether or not to use the SVD based method (only works with no regularization) or the gevp based method for informative MCCA. multiview_output : bool, optional (default True) If True, the ``.transform`` method returns one dataset per view. Otherwise, it returns one dataset, of shape (n_samples, n_components) Attributes ---------- means_ : list of numpy.ndarray The means of each view, each of shape (n_features,) loadings_ : list of numpy.ndarray The loadings for each view used to project new data, each of shape (n_features_b, n_components). common_score_norms_ : numpy.ndarray, shape (n_components,) Column norms of the sum of the fitted view scores. Used for projecting new data evals_ : numpy.ndarray, shape (n_components,) The generalized eigenvalue problem eigenvalues. n_views_ : int The number of views n_features_ : list The number of features in each fitted view n_components_ : int The number of components in each transformed view See also -------- KMCCA References ---------- .. [#1mcca] Kettenring, J. R., "Canonical Analysis of Several Sets of Variables." Biometrika, 58:433-451, (1971) .. [#2mcca] Tenenhaus, A., et al. "Regularized generalized canonical correlation analysis." Psychometrika, 76:257–284, 2011 Examples -------- >>> from mvlearn.embed import MCCA >>> X1 = [[0, 0, 1], [1, 0, 0], [2, 2, 2], [3, 5, 4]] >>> X2 = [[0.1, -0.2], [0.9, 1.1], [6.2, 5.9], [11.9, 12.3]] >>> X3 = [[0, 1, 0], [1, 9, 0], [4, 3, 3,], [12, 8, 10]] >>> mcca = MCCA() >>> mcca.fit([X1, X2, X3]) MCCA() >>> Xs_scores = mcca.transform([X1, X2, X3]) """ def __init__( self, n_components=1, regs=None, signal_ranks=None, center=True, i_mcca_method="auto", multiview_output=True, ): self.n_components = n_components self.center = center self.regs = regs self.signal_ranks = signal_ranks self.i_mcca_method = i_mcca_method self.multiview_output = multiview_output def _fit(self, Xs): """Helper function for the `.fit` function""" Xs, self.n_views_, _, self.n_features_ = check_Xs( Xs, return_dimensions=True ) centers = param_as_list(self.center, self.n_views_) self.means_ = [np.mean(X, axis=0) if c else None for X, c in zip(Xs, centers)] Xs = [X - m if m is not None else X for X, m in zip(Xs, self.means_)] if self.signal_ranks is not None: self.loadings_, scores, common_scores_normed, \ self.common_score_norms_, self.evals_ = _i_mcca( Xs, signal_ranks=self.signal_ranks, n_components=self.n_components, regs=self.regs, method=self.i_mcca_method, ) else: self.loadings_, scores, common_scores_normed, \ self.common_score_norms_, self.evals_ = _mcca_gevp( Xs, n_components=self.n_components, regs=self.regs ) return scores, common_scores_normed def inverse_transform(self, scores): """ Transforms scores back to the original space. Parameters ---------- scores: array-like, shape (n_views, n_samples, n_components) The CCA scores. Returns ------- Xs_hat : list of array-likes - Xs_hat length: n_views - Xs_hat[i] shape: (n_samples, n_features_i) The reconstructed views """ check_is_fitted(self) scores = check_Xs(scores) if len(scores) != self.n_views_: msg = f"Supplied data must have {self.n_views_} views" raise ValueError(msg) return [self.inverse_transform_view(score, i) for i, score in enumerate(scores)] def inverse_transform_view(self, scores, view): """ Transforms scores back to the original space. Parameters ---------- scores: array-like, shape (n_samples, n_components) The scores view : int The numeric index of the single view X with respect to the fitted views. Returns ------- X_hat : numpy.ndarray, shape (n_samples, n_features) The reconstructed view """ check_is_fitted(self) scores = check_array(scores) reconst = scores @ self.loadings_[view].T if self.means_[view] is not None: reconst += self.means_[view] return reconst def score(self, Xs, y=None): """ Computes the sums of squared reconstruction errors for all views. .. math:: ||X_{hat} - X||_2^2 Parameters ---------- Xs : list of array-likes or numpy.ndarray - Xs length: n_views - Xs[i] shape: (n_samples, n_features_i) The views to reconstruct and score y : None Ignored variable. Returns ------- sse : numpy.ndarray, shape (n_views,) Sums of squared reconstruction errors. If ``self.multiview_output`` is True, then the mean score is returned. """ Xs_hat = self.transform(Xs) Xs_hat = self.inverse_transform(Xs_hat) scores = [np.linalg.norm(X - X_hat) ** 2 for X, X_hat in zip(Xs, Xs_hat)] if self.multiview_output: return scores else: return np.mean(scores) def score_view(self, X, view): """ Computes the sum of squared reconstruction error for a view. .. math:: ||X_{hat} - X||_2^2 Parameters ---------- X : numpy.ndarray, shape (n_samples, n_features) The view to reconstruct and score view : int The numeric index of the single view Xs with respect to the fitted views. Returns ------- sse : float Sum of squared reconstruction errors """ check_is_fitted(self) X_hat = self.transform_view(X, view=view) X_hat = self.inverse_transform_view(X_hat, view=view) return np.linalg.norm(X - X_hat) ** 2 @property def n_components_(self): if hasattr(self, "loadings_"): return self.loadings_[0].shape[1] else: raise AttributeError("Model has not been fitted properly yet")
def _mcca_gevp(Xs, n_components=None, regs=None): """ Computes multi-view canonical correlation analysis via the generalized eigenvector formulation of SUMCORR-AVGVAR. Parameters ---------- Xs : list of array-likes or numpy.ndarray - Xs length: n_views - Xs[i] shape: (n_samples, n_features_i) The data to fit to. n_components : None | int | 'min' | 'max' Number of final components to compute. regs : float | 'lw' | 'oas' | None, or list, optional (default None) MCCA regularization for each data view, which can be important for high dimensional data. A list will specify for each view separately. If float, must be between 0 and 1 (inclusive). - 0 or None : corresponds to SUMCORR-AVGVAR MCCA. - 1 : partial least squares SVD (generalizes to more than 2 views) - 'lw' : Default ``sklearn.covariance.ledoit_wolf`` regularization - 'oas' : Default ``sklearn.covariance.oas`` regularization Returns ------- loadings : list of numpy.ndarray The loadings for each view used to project new data, each of shape (n_features_b, n_components). scores : numpy.ndarray, shape (n_views, n_samples, n_components) Projections of each data view. common_scores_normed : numpy.ndarray, shape (n_samples, n_components) Normalized sum of the view scores. common_norms : numpy.ndarray, shape (n_components,) Column norms of the sum of the view scores. Useful for projecting new data gevals : numpy.ndarray, shape (n_components,) The generalized eigenvalue problem eigenvalues. """ Xs, _, _, n_features = check_Xs( Xs, multiview=True, return_dimensions=True ) n_components = _get_n_components(n_components, n_features) # solve generalized eigenvector problem LHS, RHS = _construct_mcca_gevp(Xs=Xs, regs=regs) try: gevals, ge_loadings = eigh_wrapper(A=LHS, B=RHS, rank=n_components) except np.linalg.LinAlgError as e: if 'is not positive definite' in str(e): raise ValueError( "Eigenvalue problem has a singular matrix. Add " + "regularization (set `regs` to nonzero value) or reduce " + "the rank (set `signal_rank` low enough).") else: raise e # Split rows loadings = [load.T for load in SimpleSplitter(n_features).fit_transform( ge_loadings.T)] scores = [X @ load for X, load in zip(Xs, loadings)] # common scores are the average of the view scores and are unit norm # this is also the flag mean of the subspaces spanned by the columns # of the views e.g. see (Draper et al., 2014) common_scores = sum(scores) common_norms = np.linalg.norm(common_scores, axis=0) common_norm_scores = common_scores / common_norms # enforce deterministic output due to possible sign flipsß common_scores_normed, scores, loadings = \ _deterministic_decomp(common_norm_scores, scores, loadings) return loadings, np.asarray(scores), common_scores_normed, \ common_norms, gevals def _i_mcca(Xs, signal_ranks=None, n_components=None, regs=None, method="auto"): """ Computes informative multiview canonical correlation analysis e.g. PCA-CCA. Parameters ---------- Xs : list of array-likes or numpy.ndarray - Xs length: n_views - Xs[i] shape: (n_samples, n_features_i) The data to fit to. signal_ranks: None, int, list The initial signal rank to compute i.e. rank of the SVD. If None, will compute the full SVD. Different values can be provided for each view by inputting a list. n_components : None | int | 'min' | 'max' Number of final components to compute. regs : float | 'lw' | 'oas' | None, or list, optional (default None) MCCA regularization for each data view, which can be important for high dimensional data. A list will specify for each view separately. If float, must be between 0 and 1 (inclusive). - 0 or None : corresponds to SUMCORR-AVGVAR MCCA. - 1 : partial least squares SVD (generalizes to more than 2 views) - 'lw' : Default ``sklearn.covariance.ledoit_wolf`` regularization - 'oas' : Default ``sklearn.covariance.oas`` regularization method: 'auto' | 'svd' | 'gevp', optional (default 'auto') Whether or not to use the SVD based method (only works with no regularization) or the gevp based method. Returns ------- loadings: list of numpy.ndarray The loadings for each view used to project new data, each of shape (n_features_b, n_components). scores: numpy.ndarray, shape (n_views, n_samples, n_components) Projections of each data view. common_scores_normed: numpy.ndarray, shape (n_samples, n_components) Normalized sum of the view scores. common_norms: numpy.ndarray, shape (n_components,) Column norms of the sum of the view scores. Useful for projecting new data gevals: numpy.ndarray, shape (n_components,) The generalized or flag subspace eigenvalues """ Xs, n_views, n_samples, n_features = check_Xs( Xs, multiview=True, return_dimensions=True ) regs = _check_regs(regs, n_views) if method == "auto": if regs is not None: method = "gevp" else: method = "svd" if method == "svd": assert all(r is None for r in regs), \ "Regularization cannot be None for SVD method." # Compute initial SVD use_norm_scores = all(r is None for r in regs) bases, init_svds = _initial_svds( Xs=Xs, signal_ranks=signal_ranks, normalized_scores=use_norm_scores) # set n_components n_components = _get_n_components( n_components, [b.shape[1] for b in bases]) # Compute MCCA on reduced data if method == "svd": # left singluar vectors for each view loadings, scores, common_scores_normed, gevals = \ _flag_mean(bases, n_components=n_components) # map the view loadings back into the original feature space for b in range(n_views): D_b = init_svds[b][1] V_b = init_svds[b][2] W_b = V_b / D_b loadings[b] = W_b @ loadings[b] common_norms = np.sqrt(gevals) elif method == "gevp": # compute MCCA gevp problem on reduced data loadings, scores, common_scores_normed, common_norms, gevals = \ _mcca_gevp(Xs=bases, n_components=n_components, regs=regs) # map the view loadings back into the original feature space for b in range(n_views): V_b = init_svds[b][2] if use_norm_scores: D_b = init_svds[b][1] W_b = V_b / D_b else: W_b = V_b loadings[b] = W_b @ loadings[b] return loadings, scores, common_scores_normed, common_norms, gevals def _construct_mcca_gevp(Xs, regs=None, as_lists=False): r""" Constructs the matrices for the MCCA generalized eigenvector problem :math:`LHS v = \lambda RHS v`. Parameters ---------- Xs : list of array-likes or numpy.ndarray The list of data matrices regs : None | float | 'lw' | 'oas' or list of them, shape (n_views) As described in ``mvlearn.mcca.mcca.MCCA`` as_lists : bool If True, returns LHS and RHS as lists of composing blocks instead of their composition into full matrices. Returns ------- LHS, RHS : numpy.ndarray, (sum_b n_features_b, sum_b n_features_b) Left and right hand side matrices for the GEVP """ Xs, n_views, n_samples, n_features = check_Xs( Xs, multiview=True, return_dimensions=True ) regs = _check_regs(regs, n_views) LHS = [[None for b in range(n_views)] for b in range(n_views)] RHS = [None for b in range(n_views)] # cross covariance matrices for (a, b) in combinations(range(n_views), 2): LHS[a][b] = Xs[a].T @ Xs[b] LHS[b][a] = LHS[a][b].T # view covariance matrices, possibly regularized for b in range(n_views): if regs[b] is None: RHS[b] = Xs[b].T @ Xs[b] elif isinstance(regs[b], Number): RHS[b] = (1 - regs[b]) * Xs[b].T @ Xs[b] + \ regs[b] * np.eye(n_features[b]) elif isinstance(regs[b], str): if regs[b] == "lw": RHS[b] = ledoit_wolf(Xs[b])[0] elif regs[b] == "oas": RHS[b] = oas(Xs[b])[0] # put back on scale of X^TX as oppose to # proper cov est returned by these functions RHS[b] *= n_samples LHS[b][b] = RHS[b] if not as_lists: LHS = np.block(LHS) RHS = block_diag(*RHS) return LHS, RHS def _get_n_components(n_components, n_features): """Gets n_components from param and features""" if n_components is None: n_components = sum(n_features) elif n_components == "min": n_components = min(n_features) elif n_components == "max": n_components = max(n_features) elif n_components > sum(n_features): warn("Requested too many components. Setting to number of features") n_components = min(n_components, sum(n_features)) return n_components def _flag_mean(bases, n_components=None): """ Computes the subspace flag mean. Parameters ---------- bases: list List of orthonormal basis matrices for each subspace. n_components: None, int Number of components to compute. Returns ------- loadings: list of numpy.ndarray The loadings for each view used to project new data, each of shape (n_features_b, n_components). scores: numpy.ndarray, shape (n_views, n_samples, n_components) Projections of each data view. flag_mean: nunpy.ndarray, (ambient_dim, n_components) Flag mean orthonormal basis matrix. sqsvals: numpy.ndarray, (n_components, ) The squared singular values Notes ----- Given a colletion of orthonormal matrices, X_1, ..., X_B we compute the the low rank SVD of X := [X_1, ..., X_B]. The left singular vectors are the flag mean. We refer to the right singular vectors as the "loadings". We further refer to the projection of each view onto its corresponding entries of the view loadings as the "scores". References ---------- .. [#3mcca] Draper B., et al. "A flag representation for finite collections of subspaces of mixed dimensions." Linear Algebra and its Applications, 451:5-32, 2014 """ bases, n_views, ambient_dim, subspace_dims = check_Xs( bases, multiview=True, return_dimensions=True ) # compte SVD of concatenated basis matrix flag_mean, svals, loadings = svd_wrapper( np.hstack(bases), rank=n_components) sqsvals = svals ** 2 # get the view loadings and scores, split on rows loadings = [load.T for load in SimpleSplitter( subspace_dims).fit_transform(loadings.T)] scores = [b @ l for b, l in zip(bases, loadings)] flag_mean, scores, loadings = _deterministic_decomp( flag_mean, scores, loadings ) return loadings, np.asarray(scores), flag_mean, sqsvals