Source code for lingam.utils._f_correlation

Python implementation of the LiNGAM algorithms.
The LiNGAM Project:

import numpy as np
from scipy.linalg import eigh
from scipy.sparse.linalg import eigsh
from sklearn.utils import check_array

[docs] def f_correlation(x, y): """Implementation of F-correlation [2]_ References ---------- .. [2] Bach, Francis R., and Michael I. Jordan. "Kernel independent component analysis." Journal of machine learning research 3.Jul (2002): 1-48. Parameters ---------- x, y : array-like, shape (n_samples) Data, where ``n_samples`` is the number of samples. Returns ------- float The valus of F-correlation. """ x_ = check_array(x, ensure_2d=False) y_ = check_array(y, ensure_2d=False) n = x_.shape[0] if y_.shape[0] != n: raise ValueError("x and y must be the same size.") # Standardize x_ = (x_ - x_.mean()) / x_.std() y_ = (y_ - y_.mean()) / y_.std() X = [x_, y_] m = len(X) if n > 1000: kappa, sigma = [2e-3, 0.5] else: kappa, sigma = [2e-2, 1.0] Rs = [] Us = [] sizes = [] for i in range(m): # Incomplete Cholesky decomposition G, P = _incomplete_cholesky(X[i], sigma, n * kappa * 1e-2) G = G[np.argsort(P), :] G = G - np.tile(np.mean(G, axis=0), (len(G), 1)) # Singular value decomposition R, U = _svd(G, n, kappa) Rs.append(R) Us.append(U) sizes.append(R.shape[0]) # make R_kappa R_kappa = np.eye(sum(sizes)) st = np.cumsum(np.hstack([0, sizes])) st = st[:m] for i in range(1, m): for j in range(0, i): RUUR = np.diag(Rs[i]) @ (Us[i].T @ Us[j]) @ np.diag(Rs[j]) R_kappa[st[i] : st[i] + sizes[i], st[j] : st[j] + sizes[j]] = RUUR R_kappa[st[j] : st[j] + sizes[j], st[i] : st[i] + sizes[i]] = RUUR.T # smallest eigenvalue of R_kappa w, _ = eigsh(R_kappa, 1, which="SM") return 1 - w[0]
def _squared_dist(x, y): """Squared euclidean distance matrix.""" xx = x * x yy = y * y xy = x.reshape(-1, 1) * y xx = np.tile(xx.reshape(-1, 1), (1, len(yy))) yy = np.tile(yy, (len(xx), 1)) return np.abs(xx + yy - 2 * xy) def _incomplete_cholesky(x, sigma, tol): """Incomplete cholesky decomposition of the gram matrix with the gaussian kernel.""" n = x.shape[0] i = 0 Gjj = np.ones((n)) G = np.empty((n, 0)) P = np.array([i for i in range(n)]) while np.sum(Gjj[i:n]) > tol: G = np.append(G, np.zeros((n, 1)), axis=1) if i > 0: jast = np.argmax(Gjj[i:n]) jast = jast + i P[[i, jast]] = P[[jast, i]] G[[i, jast], :i] = G[[jast, i], :i] else: jast = 0 G[i, i] = Gjj[jast] G[i, i] = np.sqrt(G[i, i]) if i < n: dist = _squared_dist(x[P[(i + 1) : n]], x[[P[i]]]) K = np.exp(-0.5 / sigma**2 * dist) if i > 0: sigG = G[(i + 1) : n, 0:i] @ G[i, 0:i].reshape(-1, 1) G[(i + 1) : n, [i]] = 1 / G[i, i] * (K - sigG) else: G[(i + 1) : n, [i]] = 1 / G[i, i] * K if i < n: Gjj[(i + 1) : n] = np.ones((n - (i + 1))) - np.sum( G[(i + 1) : n, 0 : i + 1] ** 2, axis=1 ) i = i + 1 return G, P def _svd(G, n, kappa): """Singular value decomposition.""" eta = kappa * 1e-2 D, A = eigh(G.T @ G) indexes = np.where(D >= n * eta)[0] order = np.argsort(D[indexes]) order = order[::-1] indexes = indexes[order[0 : len(indexes)]] D = D[indexes] U = G @ (A[:, indexes] @ np.diag(np.sqrt(1.0 / (D)))) R = D for j in range(len(D)): # regularized R[j] = D[j] / (n * kappa / 2 + D[j]) return R, U