"""Kernel Multiview Canonical Correlation Analysis"""
# Authors: Iain Carmichael, Ronan Perry
# License: MIT
import numpy as np
from itertools import combinations
from warnings import warn
from sklearn.metrics import pairwise_kernels
from ..utils import check_Xs, param_as_list
from ..compose import SimpleSplitter
from ..utils import eigh_wrapper
from .base import BaseCCA, _check_regs, _initial_svds, _deterministic_decomp
from sklearn.utils.validation import check_is_fitted
from sklearn.utils import check_array
[docs]class KMCCA(BaseCCA):
r"""
Kernel multi-view canonical correlation analysis.
Parameters
-----------
n_components : int, None (default 1)
Number of components to compute. If None, will use the number of
features.
kernel : str, callable, or list (default 'linear')
The kernel function to use. This is the metric argument to
``sklearn.metrics.pairwise.pairwise_kernels``. A list will
specify for each view separately.
kernel_params : dict, or list (default {})
Key word arguments to ``sklearn.metrics.pairwise.pairwise_kernels``.
A list will specify for each view separately.
regs : float, None, or list, optional (default None)
None equates to 0. Floats are nonnegative. The value is used to
regularize singular values in each view based on `diag_mode`
A list will specify the method for each view separately.
signal_ranks : int, None, or list, optional (default None)
Largest SVD rank to compute for each view. If None, the full rank
decomposition will be used. A list will specify for each view
separately.
sval_thresh : float, or list (default 1e-3)
For each view we throw out singular values of (1/n)K, the gram matrix
scaled by n_samples, below this threshold. A non-zero value deals with
singular gram matrices.
diag_mode : 'A' | 'B' | 'C' (default 'A')
Method of regularizing singular values `s` with regularization
parameter `r`
- 'A' : :math:`(1 - r) * K^2 + r * K` [#1kmcca]_
- 'B' : :math:`(1-r) (K + n/2 \kappa * I)^2` where
:math:`\kappa = r / (1 - r)` [#2kmcca]_
- 'C' : :math:`(1 - r) K^2 + r * I_n` [#3kmcca]_
center : bool, or list (default True)
Whether or not to initially mean center the data. A list will
specify for each view separately.
filter_params : bool (default False)
See ``sklearn.metrics.pairwise.pairwise_kernels`` documentation.
n_jobs : int, None, optional (default None)
Number of jobs to run in parallel when computing kernel matrices.
See ``sklearn.metrics.pairwise.pairwise_kernels`` documentation.
multiview_output : bool, optional (default True)
If True, the ``.transform`` method returns one dataset per view.
Otherwise, it returns one dataset, of shape (n_samples, n_components)
pgso : bool, optional (default False)
If True, computes a partial Gram-Schmidt orthogonalization
approximation of the kernel matrices to the given tolerance.
tol : float (default 0.1)
The minimum matrix trace difference between a kernel matrix and its
computed pgso approximation, relative to the kernel trace.
Attributes
----------
kernel_col_means_ : list of numpy.ndarray, shape (n_samples,)
The column means of each gram matrix
kernel_mat_means_ : list
The total means of each gram matrix
dual_vars_ : numpy.ndarray, shape (n_views, n_samples, n_components)
The loadings for the gram matrix of each view
common_score_norms_ : numpy.ndarray, shape (n_components,)
Column norms of the sum of the view scores.
Useful for projecting new data
evals_ : numpy.ndarray, shape (n_components,)
The generalized eigenvalue problem eigenvalues.
Xs_ : list of numpy.ndarray, length (n_views,)
- Xs[i] shape (n_samples, n_features_i)
The original data matrices for use in kernel matrix computation
during calls to ``.transform``.
n_views_ : int
The number of views
n_features_ : list
The number of features in each fitted view
n_components_ : int
The number of components in each transformed view
pgso_Ls_ : list of numpy.ndarray, length (n_views,)
- pgso_Ls_[i] shape (n_samples, rank_i)
The Gram-Schmidt approximations of the kernel matrices
pgso_norms_ : list of numpy.ndarray, length (n_views,)
- pgso_norms_[i] shape (rank_i,)
The maximum norms found during the Gram-Schmidt procedure, descending
pgso_idxs_ : list of numpy.ndarray, length (n_views,)
- pgso_idxs_[i] shape (rank_i,)
The sample indices of the maximum norms
pgso_Xs_ : list of numpy.ndarray, length (n_views,)
- pgso_Xs_[i] shape (rank_i, n_features)
The samples with indices saved in pgso_idxs_, sorted by pgso_norms_
pgso_ranks_ : list, length (n_views,)
The ranks of the partial Gram-Schmidt results for each view.
Notes
-----
Traditional CCA aims to find useful projections of features in each view
of data, computing a weighted sum, but may not extract useful descriptors
of the data because of its linearity. KMCCA offers an alternative solution
by first projecting the data onto a higher dimensional feature space.
.. math::
\phi: \mathbf{x} = (x_1,...,x_m) \mapsto
\phi(\mathbf{x}) = (z_1,...,z_N),
(m << N)
before performing CCA in the new feature space.
Kernels are effectively distance functions that compute inner products in
the higher dimensional feature space, a method known as the kernel trick.
A kernel function K, such that for all :math:`\mathbf{x},
\mathbf{z} \in X`
.. math::
K(\mathbf{x}, \mathbf{z}) = \langle\phi(\mathbf{x})
\cdot \phi(\mathbf{z})\rangle.
The kernel matrix :math:`K_i` has entries computed from the kernel
function. Using the kernel trick, loadings of the kernel matrix
(dual_vars_) are solved for rather than of the features from :math:`\phi`.
Kernel matrices grow exponentially with the size of data. They not only
have to store :math:`n^2` elements, but also face the complexity of matrix
eigenvalue problems. In a Cholesky decomposition a positive definite
matrix K is decomposed to a lower triangular matrix :math:`L` :
:math:`K = LL'`.
The dual partial Gram-Schmidt orthogonalization (PSGO) is equivalent to the
Incomplete Cholesky Decomposition (ICD) which looks for a low rank
approximation of :math:`L`, reducing the cost of operations of the matrix
such that :math:`\frac{1}{\sum_i K_{ii}} tr(K - LL^T) \leq tol`.
A PSGO tolerance yielding rank :math:`m` leads to storage requirements of
:math:`O(mn)` instead of :math:`O(n^2)` and becomes :math:`O(nm^2)` instead
of :math:`O(n^3)` [#3kmcca]_.
See also
--------
MCCA
References
----------
.. [#1kmcca] Hardoon D., et al. "Canonical Correlation Analysis: An
Overview with Application to Learning Methods", Neural
Computation, Volume 16 (12), pp 2639-2664, 2004.
.. [#2kmcca] Bach, F. and Jordan, M. "Kernel Independent Component
Analysis." Journal of Machine Learning Research, 3:1-48, 2002.
.. [#3kmcca] Kuss, M. and Graepel, T.. "The Geometry of Kernel Canonical
Correlation Analysis." MPI Technical Report, 108. (2003).
Examples
--------
>>> from mvlearn.embed import KMCCA
>>> X1 = [[0, 0, 1], [1, 0, 0], [2, 2, 2], [3, 5, 4]]
>>> X2 = [[0.1, -0.2], [0.9, 1.1], [6.2, 5.9], [11.9, 12.3]]
>>> X3 = [[0, 1, 0], [1, 9, 0], [4, 3, 3,], [12, 8, 10]]
>>> kmcca = KMCCA()
>>> kmcca.fit([X1, X2, X3])
KMCCA()
>>> Xs_scores = kmcca.transform([X1, X2, X3])
"""
def __init__(
self,
n_components=1,
kernel="linear",
kernel_params={},
regs=None,
signal_ranks=None,
sval_thresh=1e-3,
diag_mode="A",
center=True,
filter_params=False,
n_jobs=None,
multiview_output=True,
pgso=False,
tol=0.1
):
self.n_components = n_components
self.center = center
self.regs = regs
self.kernel = kernel
self.kernel_params = kernel_params
self.signal_ranks = signal_ranks
self.sval_thresh = sval_thresh
self.diag_mode = diag_mode
self.filter_params = filter_params
self.n_jobs = n_jobs
self.multiview_output = multiview_output
self.pgso = pgso
self.tol = tol
def _fit(self, Xs):
"""Helper method for `.fit` function"""
Xs, self.n_views_, _, self.n_features_ = check_Xs(
Xs, multiview=True, return_dimensions=True)
centers = param_as_list(self.center, self.n_views_)
# set up (centered) kernels
self.kernel_col_means_ = [None] * self.n_views_
self.kernel_mat_means_ = [None] * self.n_views_
Ks = []
if self.pgso:
self.pgso_Ls_ = []
self.pgso_norms_ = []
self.pgso_idxs_ = []
self.pgso_Xs_ = []
self.pgso_ranks_ = []
else:
self.Xs_ = Xs # stored for `transform` step
for view in range(self.n_views_):
K = self._get_kernel(Xs[view], view=view)
if self.pgso:
L, maxs, idxs = _pgso_decomp(K, self.tol)
self.pgso_Ls_.append(L)
self.pgso_norms_.append(maxs)
self.pgso_idxs_.append(idxs)
self.pgso_Xs_.append(Xs[view][idxs])
self.pgso_ranks_.append(len(maxs))
K = L @ L.T
del L
if centers[view]:
K, col_mean, mat_mean = _center_kernel(K)
self.kernel_col_means_[view] = col_mean
self.kernel_mat_means_[view] = mat_mean
Ks.append(K)
self.dual_vars_, scores, common_scores_normed, \
self.common_score_norms_, self.evals_ = _kmcca_gevp(
Ks,
signal_ranks=param_as_list(self.signal_ranks, self.n_views_),
sval_thresh=self.sval_thresh,
n_components=self.n_components,
regs=self.regs,
diag_mode=self.diag_mode,
)
return scores, common_scores_normed
def transform_view(self, X, view):
"""
Transform a view, projecting it using fitted loadings.
Parameters
----------
X : numpy.ndarray, shape (n_samples, n_features)
The view to transform
view : int
The numeric index of the single view X with respect to the fitted
views.
Returns
-------
X_scores : numpy.ndarray, shape (n_samples, n_components)
Transformed view
"""
check_is_fitted(self)
X = check_array(X)
if self.pgso:
K = self._get_kernel(X, view=view, Y=self.pgso_Xs_[view])
L = _pgso_construct(K, self.pgso_Ls_[view],
self.pgso_idxs_[view], self.pgso_norms_[view])
K = L @ self.pgso_Ls_[view].T
else:
K = self._get_kernel(X, view=view, Y=self.Xs_[view])
if self.kernel_col_means_[view] is not None:
row_means = np.sum(K, axis=1)[:, np.newaxis] / K.shape[0]
K -= self.kernel_col_means_[view]
K -= row_means
K += self.kernel_mat_means_[view]
return np.dot(K, self.dual_vars_[view])
def _get_kernel(self, X, view, Y=None):
"""
Returns a gram (kernel) matrix between a set of observations and
itself or a second set of observations.
Parameters
----------
X : numpy.ndarray, shape (n, d)
The data matrix
view : int
The view index, for the kernel parameter selection
Y : numpy.ndarray, shape (m, d), optional (default None)
Second data matrix
Returns
-------
K : numpy.ndarray, shape (n, n) or (n, m) if Y provided
The kernel matrix with entries from the kernel function
"""
if isinstance(self.kernel, list):
kernel = self.kernel[view]
else:
kernel = self.kernel
if isinstance(self.kernel_params, list):
kernel_params = self.kernel_params[view]
else:
kernel_params = self.kernel_params
return pairwise_kernels(
X, Y, metric=kernel, filter_params=self.filter_params,
n_jobs=self.n_jobs, **kernel_params)
@property
def n_components_(self):
if hasattr(self, "dual_vars_"):
return self.dual_vars_[0].shape[1]
else:
raise AttributeError("Model has not been fitted properly yet")
def _pgso_decomp(K, tol=0.1):
r"""
Performs the partial Gram-Schmidt orthogonalization procedure
Parameters
----------
K : numpy.ndarray, shape (n_samples, n_samples)
The kernel matrix to approximate
tol : float, optional (default 0.1)
The tolerance of the kernel matrix approximation
Returns
-------
L : numpy.ndarray, shape (n_samples, r)
The lower triangular approximation matrix
maxs : numpy.ndarray, shape (r,)
The maximum diagonal element of K during Gram-Schmidt iterations
indices : numpy.ndarray, shape (r,)
The sample indices of each of the values in `maxs`.
Notes
-----
The procedure iteratively constructs a lower triangular matrix L and
stops when the approximation L gives is below the tolerance, i.e.
..math::
\frac{1}{\sum_i K_{ii}} tr(K - LL^T) \leq tol
"""
K = check_array(K)
N = K.shape[0]
norm2 = K.diagonal().copy()
norm2_sum = sum(norm2)
L = np.zeros(K.shape)
max_sizes = []
max_indices = []
M = 0
for j in range(N):
max_idx = np.argmax(norm2)
max_indices.append(max_idx)
max_sizes.append(np.sqrt(norm2[max_idx]))
L[:, j] = (
K[:, max_idx] - L[:, :j] @ L[max_idx, :j].T
) / max_sizes[-1]
norm2 -= L[:, j]**2
M = j+1
if sum(norm2) / norm2_sum < tol:
break
return L[:, :M], np.asarray(max_sizes), np.asarray(max_indices)
def _pgso_construct(K, L, indices, maxs):
"""
Constructs features for samples from a partial Gram-Schmidt
orthogonalization on different samples
Parameters
----------
K : numpy.ndarray, shape (n_new_samples, n_new_samples)
The kernel matrix of new samples to approximate
L : numpy.ndarray, shape (n_samples, r)
Lower triangular Gram-Schmidt decomposition of a kernel matrix
indices : numpy.ndarray, shape (n_features,)
The sample indices of the maximums
maxs : numpy.ndarray, shape (n_features,)
The maximum diagonal elements during construction of L
Returns
-------
L_oos : numpy.ndarray, shape (n_samples, r)
The lower triangular approximation matrix
"""
L_oos = np.zeros((K.shape[0], L.shape[1]))
for j in range(L.shape[1]):
L_oos[:, j] = (
K[:, j] - L_oos[:, :j] @ L[indices[j], :j].T) / maxs[j]
return L_oos
def _kmcca_gevp(
Ks,
signal_ranks=None,
n_components=None,
regs=None,
sval_thresh=1e-3,
diag_mode="A",
):
"""
Computes kernel MCCA using the eigenvector formulation where we compute
matrix square roots via SVDs. By throwing out zero singular values of
the kernel views we can avoid singularity issues.
Parameters
----------
Ks : list of numpy.ndarray, length (n_views,)
- Ks[i] shape (n_samples, n_samples)
List of kernel matrices
n_components : int, None
Number of components to compute. If None, will use the number of
features.
regs : float, None, or list
None equates to 0. Floats are nonnegative. The value is used to
regularize singular values in each view based on `diag_mode`
A list will specify the method for each view separately.
signal_ranks : int, None, or list
Largest SVD rank to compute for each view. If None, the full rank
decomposition will be used. A list will specify for each view
separately.
sval_thresh : float, (default 1e-3)
For each view we throw out singular values of (1/n)K, the gram matrix
scaled by n_samples, below this threshold. A non-zero value deals with
singular gram matrices.
diag_mode : 'A' | 'B' | 'C' (default 'A')
Method of regularizing singular values `s` with regularization
parameter `r`
- 'A' : (1 - r) * K^2 + r * K [#1kmcca]_
- 'B' : (1-r) (K + n/2 kappa * I)^2 where kappa = r / (1 - r)
[#2kmcca]_
- 'C' : (1 - r) K^2 + r * I [#3kmcca]_
Returns
-------
dual_vars : numpy.ndarray, shape (n_views, n_samples, n_components)
The loadings for each view used to project new data,
each of shape (n_features_b, n_components).
scores : numpy.ndarray, shape (n_views, n_samples, n_components)
Projections of each data view.
common_scores_normed : numpy.ndarray, shape (n_samples, n_components)
Normalized sum of the view scores.
common_norms : numpy.ndarray, shape (n_components,)
Column norms of the sum of the view scores.
Useful for projecting new data
gevals : numpy.ndarray, shape (n_components,)
The generalized eigenvalue problem eigenvalues.
"""
if sval_thresh is not None:
# put sval_thresh on the scale of svals K.
sval_thresh *= Ks[0].shape[0]
Us, svds = _initial_svds(Ks,
signal_ranks=signal_ranks,
normalized_scores=True,
sval_thresh=sval_thresh)
svals = [svd[1] for svd in svds]
Us, n_views, n_samples, n_features_reduced = check_Xs(
Us, return_dimensions=True)
regs = _check_regs(regs=regs, n_views=n_views)
if n_components is None:
n_components = sum(n_features_reduced)
if n_components > sum(n_features_reduced):
warn("Requested too many components!")
n_components = min(n_components, sum(n_features_reduced))
# get singular values of transformed view gram matrices
reg_svals = _regularize_svals(
svals=svals, regs=regs, diag_mode=diag_mode, n_samples=n_samples
)
# constructe matrix for eigen decomposition
C = [[None for _ in range(n_views)] for _ in range(n_views)]
for b in range(n_views):
C[b][b] = np.eye(n_features_reduced[b])
for (a, b) in combinations(range(n_views), 2):
U_a = Us[a]
s_a = svals[a]
t_a = reg_svals[a]
q_a = s_a * (1 / np.sqrt(t_a))
U_b = Us[b]
s_b = svals[b]
t_b = reg_svals[b]
q_b = s_b * (1 / np.sqrt(t_b))
C[a][b] = (U_a * q_a).T @ (U_b * q_b)
C[b][a] = C[a][b].T
C = np.block(C)
gevals, gevecs = eigh_wrapper(A=C, rank=n_components)
gevecs = [gevec.T for gevec in SimpleSplitter(
n_features_reduced).fit_transform(gevecs.T)]
dual_vars = [(Us[b] / np.sqrt(reg_svals[b])) @ gevecs[b]
for b in range(n_views)]
scores = [Ks[b] @ dual_vars[b] for b in range(n_views)]
common_scores = sum(scores)
common_norms = np.linalg.norm(common_scores, axis=0)
common_scores_normed = common_scores / common_norms
# enforce deterministic output due to possible sign flips
common_scores_normed, scores, dual_vars = _deterministic_decomp(
common_scores_normed, scores, dual_vars)
return np.asarray(dual_vars), np.asarray(scores), common_scores_normed, \
common_norms, gevals
def _regularize_svals(svals, regs=None, diag_mode="A",
n_samples=None):
"""
Regularizes singular values for various mode options.
Parameters
----------
svals : list of numpy.ndarray
Singular values of each kernel matrix.
regs : None, float, or list
Regularization parameter for each view.
diag_mode : 'A' | 'B' | 'C'
Method of regularizing singular values `s` with regularization
parameter `r`
n_samples : None, int
Number of samples. Needed for mode B.
Returns
-------
reg_svals : list of array-like
Regularized singular values for each view and given diag_mode.
"""
n_views = len(svals)
reg_svals = [None for b in range(n_views)]
for b in range(n_views):
s = np.array(svals[b])
if regs is None:
r = None
else:
r = regs[b]
if r is None or r == 0:
t = s ** 2
elif diag_mode == "A":
t = (1 - r) * s ** 2 + r * s
elif diag_mode == "B":
assert n_samples is not None
if np.isclose(r, 1):
t = s
else:
kappa = r / (1 - r)
t = (1 - r) * (s + 0.5 * n_samples * kappa) ** 2
elif diag_mode == "C":
t = (1 - r) * s ** 2 + r
reg_svals[b] = t
return reg_svals
def _center_kernel(K):
"""
Centers a kernel matrix data.
Parameters
----------
K : np.ndarray, shape (n,n)
A kernel matrix
Returns
-------
K_c : numpy.ndarray, shape (n,n)
The centered kernel matrix
col_mean : numpy.ndarray, shape (n,)
Mean of each kernel matrix column
mat_mean : float
Mean of entire kernel matrix
"""
col_mean = np.mean(K, axis=0)
mat_mean = np.mean(col_mean)
K_c = K - col_mean - col_mean[:, np.newaxis] + mat_mean
return K_c, col_mean, mat_mean