"""
Python implementation of the LiNGAM algorithms.
The LiNGAM Project: https://sites.google.com/view/sshimizu06/lingam
"""
import copy
import itertools
import numpy as np
from pygam import LinearGAM
from sklearn.utils import check_array
from .hsic import hsic_test_gamma
from .utils import f_correlation
[docs]
class CAMUV:
"""Implementation of CAM-UV Algorithm [1]_
References
----------
.. [1] T.N.Maeda and S.Shimizu. Causal additive models with unobserved variables.
In Proc. Thirty-Seventh Conference on Uncertainty in Artificial Intelligence (UAI). PMLR 161:97-106, 2021.
"""
[docs]
def __init__(
self, alpha=0.01, num_explanatory_vals=2, independence="hsic", ind_corr=0.5
):
"""Construct a CAM-UV model.
Parameters
----------
alpha : float, optional (default=0.01)
Alpha level.
num_explanatory_vals : int, optional (default=2)
Maximum number of explanatory variables.
independence : {'hsic', 'fcorr'}, optional (default='hsic')
Methods to determine independence. If 'hsic' is set, test for independence by HSIC.
If 'fcorr' is set, independence is determined by F-correlation.
ind_corr : float, optional (default=0.5)
The threshold value for determining independence by F-correlation;
independence is determined when the value of F-correlation is below this threshold value.
"""
# Check parameters
if num_explanatory_vals <= 0:
raise ValueError("num_explanatory_vals must be > 0.")
if alpha < 0:
raise ValueError("alpha must be >= 0.")
if independence not in ("hsic", "fcorr"):
raise ValueError("independence must be 'hsic' or 'fcorr'.")
if ind_corr < 0.0:
raise ValueError("ind_corr must be an float greater than 0.")
self._num_explanatory_vals = num_explanatory_vals
self._alpha = alpha
self._independence = independence
self._ind_corr = ind_corr
def fit(self, X):
X = check_array(X)
n = X.shape[0]
d = X.shape[1]
N = self._get_neighborhoods(X)
P = self._find_parents(X, self._num_explanatory_vals, N)
U = []
for i in range(d):
for j in range(d)[i + 1 :]:
if (i in P[j]) or (j in P[i]):
continue
if (i not in N[j]) or (j not in N[i]):
continue
i_residual = self._get_residual(X, i, P[i])
j_residual = self._get_residual(X, j, P[j])
in_X = np.reshape(i_residual, [n, 1])
in_Y = np.reshape(j_residual, [n, 1])
if not self._is_independent(in_X, in_Y):
if not set([i, j]) in U:
U.append(set([i, j]))
return self._estimate_adjacency_matrix(X, P, U)
def _get_residual(self, X, explained_i, explanatory_ids):
explanatory_ids = list(explanatory_ids)
if len(explanatory_ids) == 0:
residual = X[:, explained_i]
else:
gam = LinearGAM().fit(X[:, explanatory_ids], X[:, explained_i])
residual = X[:, explained_i] - gam.predict(X[:, explanatory_ids])
return residual
def _is_independent(self, X, Y):
if self._independence == "hsic":
threshold = self._alpha
elif self._independence == "fcorr":
threshold = self._ind_corr
is_independent, _ = self._is_independent_by(X, Y, threshold)
return is_independent
def _is_independent_by(self, X, Y, threshold):
is_independent = False
if self._independence == "hsic":
_, value = hsic_test_gamma(X, Y)
is_independent = value > threshold
elif self._independence == "fcorr":
value = f_correlation(X, Y)
is_independent = value < threshold
return is_independent, value
def _get_neighborhoods(self, X):
n = X.shape[0]
d = X.shape[1]
N = [set() for i in range(d)]
for i in range(d):
for j in range(d)[i + 1 :]:
in_X = np.reshape(X[:, i], [n, 1])
in_Y = np.reshape(X[:, j], [n, 1])
if not self._is_independent(in_X, in_Y):
N[i].add(j)
N[j].add(i)
return N
def _find_parents(self, X, maxnum_vals, N):
n = X.shape[0]
d = X.shape[1]
P = [set() for i in range(d)] # Parents
t = 2
Y = copy.deepcopy(X)
while True:
changed = False
variables_set_list = list(itertools.combinations(set(range(d)), t))
for variables_set in variables_set_list:
variables_set = set(variables_set)
if not self._check_identified_causality(variables_set, P):
continue
child, is_independence_with_K = self._get_child(
X, variables_set, P, N, Y
)
if not is_independence_with_K:
continue
parents = variables_set - {child}
if not self._check_independence_withou_K(parents, child, P, N, Y):
continue
for parent in parents:
P[child].add(parent)
changed = True
Y = self._get_residuals_matrix(X, Y, P, child)
if changed:
t = 2
else:
t += 1
if t > maxnum_vals:
break
for i in range(d):
non_parents = set()
for j in P[i]:
residual_i = self._get_residual(X, i, P[i] - {j})
residual_j = self._get_residual(X, j, P[j])
in_X = np.reshape(residual_i, [n, 1])
in_Y = np.reshape(residual_j, [n, 1])
if self._is_independent(in_X, in_Y):
non_parents.add(j)
P[i] = P[i] - non_parents
return P
def _get_residuals_matrix(self, X, Y_old, P, child):
Y = copy.deepcopy(Y_old)
Y[:, child] = self._get_residual(X, child, P[child])
return Y
def _get_child(self, X, variables_set, P, N, Y):
n = X.shape[0]
prev_independence = 0.0 if self._independence == "hsic" else 1.0
max_independence_child = None
for child in variables_set:
parents = variables_set - {child}
if not self._check_correlation(child, parents, N):
continue
residual = self._get_residual(X, child, parents | P[child])
in_X = np.reshape(residual, [n, 1])
in_Y = np.reshape(Y[:, list(parents)], [n, len(parents)])
is_ind, value = self._is_independent_by(in_X, in_Y, prev_independence)
if is_ind:
prev_independence = value
max_independence_child = child
if self._independence == "hsic":
is_independent = prev_independence > self._alpha
elif self._independence == "fcorr":
is_independent = prev_independence < self._ind_corr
return max_independence_child, is_independent
def _check_independence_withou_K(self, parents, child, P, N, Y):
n = Y.shape[0]
for parent in parents:
in_X = np.reshape(Y[:, child], [n, 1])
in_Y = np.reshape(Y[:, parent], [n, 1])
if self._is_independent(in_X, in_Y):
return False
return True
def _check_identified_causality(self, variables_set, P):
variables_list = list(variables_set)
for i in variables_list:
for j in variables_list[variables_list.index(i) + 1 :]:
if (j in P[i]) or (i in P[j]):
return False
return True
def _check_correlation(self, child, parents, N):
for parent in parents:
if parent not in N[child]:
return False
return True
def _estimate_adjacency_matrix(self, X, P, U):
B = np.zeros([X.shape[1], X.shape[1]], dtype="float64")
for i, parents in enumerate(P):
for parent in parents:
B[i, parent] = 1
for confounded_pair in U:
confounded_pair = list(confounded_pair)
B[confounded_pair[0], confounded_pair[1]] = np.nan
B[confounded_pair[1], confounded_pair[0]] = np.nan
self._adjacency_matrix = B
return self
@property
def adjacency_matrix_(self):
"""Estimated adjacency matrix.
Returns
-------
adjacency_matrix_ : array-like, shape (n_features, n_features)
The adjacency matrix B of fitted model, where
n_features is the number of features.
"""
return self._adjacency_matrix