Source code for lingam.utils._rcd
import itertools
import numpy as np
from scipy.optimize import fmin_l_bfgs_b
from scipy.stats import pearsonr, shapiro
from sklearn.linear_model import LinearRegression
from sklearn.utils import check_array
from ..hsic import get_gram_matrix, get_kernel_width, hsic_test_gamma, hsic_teststat
[docs]
def extract_ancestors(
X,
max_explanatory_num=2,
cor_alpha=0.01,
ind_alpha=0.01,
shapiro_alpha=0.01,
MLHSICR=True,
bw_method="mdbs",
):
"""Extract a set of ancestors of each variable
Implementation of RCD Algorithm1 [1]_
References
----------
.. [1] T.N.Maeda and S.Shimizu. RCD: Repetitive causal discovery of
linear non-Gaussian acyclic models with latent confounders.
In Proc. 23rd International Conference on Artificial Intelligence and
Statistics (AISTATS2020), Palermo, Sicily, Italy. PMLR 108:735-745, 2020.
Parameters
----------
X : array-like, shape (n_samples, n_features)
Training data, where ``n_samples`` is the number of samples
and ``n_features`` is the number of features.
max_explanatory_num : int, optional (default=2)
Maximum number of explanatory variables.
cor_alpha : float, optional (default=0.01)
Alpha level for pearson correlation.
ind_alpha : float, optional (default=0.01)
Alpha level for HSIC.
shapiro_alpha : float, optional (default=0.01)
Alpha level for Shapiro-Wilk test.
MLHSICR : bool, optional (default=False)
If True, use MLHSICR for multiple regression, if False, use OLS for multiple regression.
bw_method : str, optional (default=``mdbs``)
The method used to calculate the bandwidth of the HSIC.
* ``mdbs`` : Median distance between samples.
* ``scott`` : Scott's Rule of Thumb.
* ``silverman`` : Silverman's Rule of Thumb.
"""
n_features = X.shape[1]
M = [set() for i in range(n_features)]
l = 1
hu_history = {}
X = check_array(X)
while True:
changed = False
U_list = itertools.combinations(range(n_features), l + 1)
U_list = [sorted(list(U)) for U in U_list]
for U in U_list:
# Get the set of common ancestors of U
H_U = get_common_ancestors(M, U)
if tuple(U) in hu_history and H_U == hu_history[tuple(U)]:
continue
Y = get_residual_matrix(X, U, H_U)
# Shapiro-Wilk test
if not is_non_gaussianity(Y, U, shapiro_alpha):
continue
# Pearson's correlation
is_cor = True
for xi, xj in itertools.combinations(U, 2):
if not is_correlated(Y[:, xi], Y[:, xj], cor_alpha):
is_cor = False
break
if not is_cor:
continue
sink_set = []
for xi in U:
xj_list = list(set(U) - set([xi]))
if exists_ancestor_in_U(M, U, xi, xj_list):
continue
# Check whether the residuals obtained from multiple regressions are independent
if is_independent_of_resid(
Y, xi, xj_list, ind_alpha, bw_method, MLHSICR
):
sink_set.append(xi)
if len(sink_set) == 1:
xi = sink_set[0]
xj_list = list(set(U) - set(sink_set))
if not M[xi] == M[xi] | set(xj_list):
M[xi] = M[xi] | set(xj_list)
changed = True
hu_history[tuple(U)] = H_U
if changed:
l = 1
elif l < max_explanatory_num:
l += 1
else:
break
return M
def get_common_ancestors(M, U):
"""Get the set of common ancestors of U"""
Mj_list = [M[xj] for xj in U]
return set.intersection(*Mj_list)
def get_residual_matrix(X, U, H_U):
if len(H_U) == 0:
return X
Y = np.zeros_like(X)
for xj in U:
Y[:, xj], _ = get_resid_and_coef(X, xj, list(H_U))
return Y
def is_non_gaussianity(Y, U, alpha):
"""Test whether a variable is generated from a non-Gaussian process using the Shapiro-Wilk test"""
for xj in U:
if shapiro(Y[:, xj])[1] > alpha:
return False
return True
def is_correlated(a, b, alpha):
"""Estimate that the two variables are linearly correlated using the Pearson's correlation"""
return pearsonr(a, b)[1] < alpha
def exists_ancestor_in_U(M, U, xi, xj_list):
# Check xi is not in Mj, the ancestor of xj.
for xj in xj_list:
if xi in M[xj]:
return True
# Check if xj_list is a subset of Mi, the ancestor of xi.
if set(xj_list) == set(xj_list) & M[xi]:
return True
return False
def is_independent_of_resid(Y, xi, xj_list, alpha, bw_method, MLHSICR):
"""Check whether the residuals obtained from multiple regressions are independent"""
n_samples = Y.shape[0]
# Multiple Regression with OLS.
is_all_independent = True
resid, _ = get_resid_and_coef(Y, xi, xj_list)
for xj in xj_list:
if not is_independent(
np.reshape(resid, [n_samples, 1]),
np.reshape(Y[:, xj], [n_samples, 1]),
bw_method,
alpha,
):
is_all_independent = False
break
if is_all_independent:
return True
elif len(xj_list) == 1 or MLHSICR is False:
return False
# Multiple Regression with MLHSICR.
resid, _ = get_resid_and_coef_by_MLHSICR(Y, xi, xj_list)
for xj in xj_list:
if not is_independent(
np.reshape(resid, [n_samples, 1]),
np.reshape(Y[:, xj], [n_samples, 1]),
bw_method,
alpha,
):
return False
return True
def get_resid_and_coef(X, endog_idx, exog_idcs):
"""Get the residuals and coefficients of the ordinary least squares method"""
lr = LinearRegression()
lr.fit(X[:, exog_idcs], X[:, endog_idx])
resid = X[:, endog_idx] - lr.predict(X[:, exog_idcs])
return resid, lr.coef_
def get_resid_and_coef_by_MLHSICR(Y, xi, xj_list):
"""Get the residuals and coefficients by minimizing the sum of HSICs using the L-BFGS method."""
n_samples = Y.shape[0]
width_list = []
Lc_list = []
for xj in xj_list:
yj = np.reshape(Y[:, xj], [n_samples, 1])
width_xj = get_kernel_width(yj)
_, Lc = get_gram_matrix(yj, width_xj)
width_list.append(width_xj)
Lc_list.append(Lc)
_, initial_coef = get_resid_and_coef(Y, xi, xj_list)
width_xi = get_kernel_width(np.reshape(Y[:, xi], [n_samples, 1]))
# Calculate the sum of the Hilbert-Schmidt independence criterion
def sum_empirical_hsic(coef):
resid = Y[:, xi]
width = width_xi
for j, xj in enumerate(xj_list):
resid = resid - coef[j] * Y[:, xj]
width = width - coef[j] * width_list[j]
_, Kc = get_gram_matrix(np.reshape(resid, [n_samples, 1]), width)
objective = 0.0
for j, xj in enumerate(xj_list):
objective += hsic_teststat(Kc, Lc_list[j], n_samples)
return objective
# Estimate coefficients by minimizing the sum of HSICs using the L-BFGS method.
coefs, _, _ = fmin_l_bfgs_b(
func=sum_empirical_hsic, x0=initial_coef, approx_grad=True
)
resid = Y[:, xi]
for j, xj in enumerate(xj_list):
resid = resid - coefs[j] * Y[:, xj]
return resid, coefs
def is_independent(X, Y, bw_method, alpha):
_, p = hsic_test_gamma(X, Y, bw_method=bw_method)
return p > alpha