"""
Python implementation of the LiNGAM algorithms.
The LiNGAM Project: https://sites.google.com/view/sshimizu06/lingam
"""
import itertools
import numbers
import warnings
import numpy as np
from scipy.stats.distributions import chi2
from sklearn.utils import check_array, resample
from .bootstrap import BootstrapResult
from .hsic import hsic_test_gamma
from .utils import predict_adaptive_lasso, f_correlation, calculate_total_effect
[docs]
class BottomUpParceLiNGAM:
"""Implementation of ParceLiNGAM Algorithm [1]_
References
----------
.. [1] T. Tashiro, S. Shimizu, and A. Hyvärinen.
ParceLiNGAM: a causal ordering method robust against latent confounders.
Neural computation, 26.1: 57-83, 2014.
"""
[docs]
def __init__(
self,
random_state=None,
alpha=0.1,
regressor=None,
prior_knowledge=None,
independence="hsic",
ind_corr=0.5,
):
"""Construct a BottomUpParceLiNGAM model.
Parameters
----------
random_state : int, optional (default=None)
``random_state`` is the seed used by the random number generator.
alpha : float, optional (default=0.1)
Significant level of statistical test. If alpha=0.0, rejection does not occur in statistical tests.
regressor : regressor object implementing 'fit' and 'predict' function (default=None)
Regressor to compute residuals.
This regressor object must have ``fit``method and ``predict`` function like scikit-learn's model.
prior_knowledge : array-like, shape (n_features, n_features), optional (default=None)
Prior knowledge used for causal discovery, where ``n_features`` is the number of features.
The elements of prior knowledge matrix are defined as follows [1]_:
* ``0`` : :math:`x_i` does not have a directed path to :math:`x_j`
* ``1`` : :math:`x_i` has a directed path to :math:`x_j`
* ``-1`` : No prior knowledge is available to know if either of the two cases above (0 or 1) is true.
independence : {'hsic', 'fcorr'}, optional (default='hsic')
Methods to determine independence. If 'hsic' is set, test for independence by HSIC.
If 'fcorr' is set, independence is determined by F-correlation.
ind_corr : float, optional (default=0.5)
The threshold value for determining independence by F-correlation;
independence is determined when the value of F-correlation is below this threshold value.
"""
# Check parameters
if regressor is not None:
if not (hasattr(regressor, "fit") and hasattr(regressor, "predict")):
raise ValueError("'regressor' has no fit or predict method.")
if alpha < 0.0:
raise ValueError("alpha must be an float greater than 0.")
if independence not in ("hsic", "fcorr"):
raise ValueError("independence must be 'hsic' or 'fcorr'.")
if ind_corr < 0.0:
raise ValueError("ind_corr must be an float greater than 0.")
self._random_state = random_state
self._alpha = alpha
self._causal_order = None
self._adjacency_matrix = None
self._reg = regressor
self._Aknw = prior_knowledge
self._independence = independence
self._ind_corr = ind_corr
if self._Aknw is not None:
self._Aknw = check_array(self._Aknw)
self._Aknw = np.where(self._Aknw < 0, np.nan, self._Aknw)
[docs]
def fit(self, X):
"""Fit the model to X.
Parameters
----------
X : array-like, shape (n_samples, n_features)
Training data, where ``n_samples`` is the number of samples
and ``n_features`` is the number of features.
Returns
-------
self : object
Returns the instance itself.
"""
self._causal_order = None
self._adjacency_matrices = None
# Check parameters
X = check_array(X)
n_features = X.shape[1]
# Check prior knowledge
if self._Aknw is not None:
if (n_features, n_features) != self._Aknw.shape:
raise ValueError(
"The shape of prior knowledge must be (n_features, n_features)"
)
else:
# Extract all partial orders in prior knowledge matrix
self._partial_orders = self._extract_partial_orders(self._Aknw)
# Center variables for each group
X = X - np.tile(np.mean(X, axis=0), (X.shape[0], 1))
# bonferroni correction
self._thresh_p = self._alpha / (n_features - 1)
# Search causal orders one by one from the bottom upward
K_bttm, p_bttm = self._search_causal_order(X)
U_res = list(np.setdiff1d(np.arange(n_features), K_bttm))
K = []
# Add a list of features whose order is unknown.
if len(U_res) > 1:
K = [U_res]
for i in K_bttm:
K.append(i)
self._causal_order = K
self._p_list = p_bttm
return self._estimate_adjacency_matrix(X, prior_knowledge=self._Aknw)
def _extract_partial_orders(self, pk):
"""Extract partial orders from prior knowledge."""
path_pairs = np.array(np.where(pk == 1)).transpose()
no_path_pairs = np.array(np.where(pk == 0)).transpose()
# Check for inconsistencies in pairs with path
check_pairs = np.concatenate([path_pairs, path_pairs[:, [1, 0]]])
if len(check_pairs) > 0:
pairs, counts = np.unique(check_pairs, axis=0, return_counts=True)
if len(pairs[counts > 1]) > 0:
raise ValueError(
f"The prior knowledge contains inconsistencies at the following indices: {pairs[counts>1].tolist()}"
)
# Check for inconsistencies in pairs without path
# If there are duplicate pairs without path, they cancel out and are not ordered.
check_pairs = np.concatenate([no_path_pairs, no_path_pairs[:, [1, 0]]])
if len(check_pairs) > 0:
pairs, counts = np.unique(check_pairs, axis=0, return_counts=True)
check_pairs = np.concatenate([no_path_pairs, pairs[counts > 1]])
pairs, counts = np.unique(check_pairs, axis=0, return_counts=True)
no_path_pairs = pairs[counts < 2]
check_pairs = np.concatenate([path_pairs, no_path_pairs[:, [1, 0]]])
if len(check_pairs) == 0:
# If no pairs are extracted from the specified prior knowledge,
return check_pairs
pairs = np.unique(check_pairs, axis=0)
return pairs[:, [1, 0]] # [to, from] -> [from, to]
def _search_candidate(self, U):
"""Search for candidate features"""
# If no prior knowledge is specified, nothing to do.
if self._Aknw is None:
return U
# Candidate features that are not to the left of the partial orders
if len(self._partial_orders) != 0:
Uc = [i for i in U if i not in self._partial_orders[:, 0]]
return Uc
return U
def _search_causal_order(self, X):
"""Search causal orders one by one from the bottom upward."""
U = np.arange(X.shape[1])
K_bttm = []
p_bttm = []
is_search_causal_order = True
while is_search_causal_order:
# Search for candidate features
Uc = self._search_candidate(U)
# Find the most sink variable
m, _, eval = self._find_exo_vec(X, Uc, U)
# Conduct statistical test by the p-value or the statistic
# If statistical test is not rejected
if not self._is_reject(eval):
# Add index of the exogenous variable to K_bttm
K_bttm = np.append(m, K_bttm).astype(np.int64)
p_bttm.insert(0, eval)
# Update U and partial orders
U = U[U != m]
if self._Aknw is not None:
self._partial_orders = self._partial_orders[
self._partial_orders[:, 1] != m
]
# If there is only one candidate for sink variable, the search ends
if len(U) <= 1:
K_bttm = np.append(U, K_bttm).astype(np.int64)
p_bttm.insert(0, 0.0)
is_search_causal_order = False
# If statistical test is rejected
else:
is_search_causal_order = False
return K_bttm, p_bttm
def _find_exo_vec(self, X, Uc, U):
"""Find the most exogenous vector."""
eval = np.inf
exo_vec = []
if len(Uc) == 1:
# If there is only one variable in Uc,
# calculate HSIC with the rest of the variables
m = np.array([Uc[0]])
predictors = np.setdiff1d(U, Uc[0])
R = self._compute_residuals(X, predictors, m)
if self._independence == "hsic":
eval, _ = self._fisher_hsic_test(X[:, predictors], R, np.inf)
elif self._independence == "fcorr":
eval = self._f_correlation(X[:, predictors], R)
return m, [], eval
else:
max_p_stat = np.inf
for j in range(len(Uc)):
xi_index = np.setdiff1d(Uc, Uc[j])
xj_index = np.array([Uc[j]])
# Compute residuals
R = self._compute_residuals(X, xi_index, xj_index)
if self._independence == "hsic":
# HSIC test with Fisher's method
fisher_p, fisher_stat = self._fisher_hsic_test(
X[:, xi_index], R, max_p_stat
)
# Update output
if fisher_stat < max_p_stat:
exo_vec = xi_index
eval = fisher_p
max_p_stat = fisher_stat
elif self._independence == "fcorr":
f_corr = self._f_correlation(X[:, xi_index], R)
# Update output
if f_corr < eval:
exo_vec = xi_index
eval = f_corr
m = np.setdiff1d(Uc, exo_vec)
return m, exo_vec, eval
def _is_reject(self, eval):
is_reject = False
if self._independence == "hsic":
if eval < self._thresh_p:
is_reject = True
elif self._independence == "fcorr":
if eval >= self._ind_corr:
is_reject = True
return is_reject
def _compute_residuals(self, X, predictors, target):
"""Compute residuals"""
if self._reg is None:
# Compute residuals of least square regressions
cov = np.cov(X.T)
coef = np.dot(
np.linalg.pinv(cov[np.ix_(predictors, predictors)]),
cov[np.ix_(target, predictors)].reshape(predictors.shape[0], 1),
)
R = X[:, target] - np.dot(X[:, predictors], coef)
else:
self._reg.fit(X[:, predictors], np.ravel(X[:, target]))
R = X[:, target] - self._reg.predict(X[:, predictors]).reshape(-1, 1)
return R
def _fisher_hsic_test(self, X, R, max_p_stat):
"""Conduct statistical test by HSIC and Fisher's method."""
fisher_stat = 0
n_features = X.shape[1]
if n_features == 1:
fisher_stat, fisher_p = hsic_test_gamma(X, R)
else:
for i in range(n_features):
_, hsic_p = hsic_test_gamma(X[:, [i]], R)
fisher_stat += np.inf if hsic_p == 0 else -2 * np.log(hsic_p)
if fisher_stat > max_p_stat:
break
fisher_p = 1 - chi2.cdf(fisher_stat, df=2 * n_features)
return fisher_p, fisher_stat
def _f_correlation(self, X, R):
"""Determine independence by F-correlation."""
max_f_corr = 0.0
n_features = X.shape[1]
if n_features == 1:
max_f_corr = f_correlation(X, R)
else:
for i in range(n_features):
f_corr = f_correlation(X[:, [i]], R)
if f_corr > max_f_corr:
max_f_corr = f_corr
return max_f_corr
def _flatten(self, arr):
"""Return a copy of an array flattened in one dimension."""
return [
val
for item in arr
for val in (
self._flatten(item)
if hasattr(item, "__iter__") and not isinstance(item, str)
else [item]
)
]
def _estimate_adjacency_matrix(self, X, prior_knowledge=None):
"""Estimate adjacency matrix by causal order.
Parameters
----------
X : array-like, shape (n_samples, n_features)
Training data, where n_samples is the number of samples
and n_features is the number of features.
prior_knowledge : array-like, shape (n_variables, n_variables), optional (default=None)
Prior knowledge matrix.
Returns
-------
self : object
Returns the instance itself.
"""
if prior_knowledge is not None:
pk = prior_knowledge.copy()
np.fill_diagonal(pk, 0)
B = np.zeros([X.shape[1], X.shape[1]], dtype="float64")
for i in range(1, len(self._causal_order)):
target = self._causal_order[i]
# Flatten the array into one dimension
predictors = self._flatten(self._causal_order[:i])
# Exclude variables specified in no_path with prior knowledge
if prior_knowledge is not None:
predictors = [p for p in predictors if pk[target, p] != 0]
# target is exogenous variables if predictors are empty
if len(predictors) != 0:
B[target, predictors] = predict_adaptive_lasso(X, predictors, target)
# Set np.nan if order is unknown
for unk_order in self._causal_order:
if hasattr(unk_order, "__iter__") and not isinstance(unk_order, str):
for i in range(len(unk_order) - 1):
xi = unk_order[i]
for xj in unk_order[i + 1 :]:
B[xi, xj] = np.nan
B[xj, xi] = np.nan
self._adjacency_matrix = B
return self
[docs]
def estimate_total_effect(self, X, from_index, to_index):
"""Estimate total effect using causal model.
Parameters
----------
X : array-like, shape (n_samples, n_features)
Original data, where n_samples is the number of samples
and n_features is the number of features.
from_index :
Index of source variable to estimate total effect.
to_index :
Index of destination variable to estimate total effect.
Returns
-------
total_effect : float
Estimated total effect.
"""
# Check parameters
X = check_array(X)
# Check from/to causal order
for i, order in enumerate(self._causal_order):
if hasattr(order, "__iter__") and from_index in order:
from_order = i
break
elif not hasattr(order, "__iter__") and int(from_index) == int(order):
from_order = i
break
for i, order in enumerate(self._causal_order):
if hasattr(order, "__iter__") and to_index in order:
to_order = i
break
elif not hasattr(order, "__iter__") and int(to_index) == int(order):
to_order = i
break
if from_order > to_order:
warnings.warn(
f"The estimated causal effect may be incorrect because "
f"the causal order of the destination variable (to_index={to_index}) "
f"is earlier than the source variable (from_index={from_index})."
)
# Check confounders
if True in np.isnan(self._adjacency_matrix[from_index]):
warnings.warn(
f"The estimated causal effect may be incorrect because "
f"the source variable (from_index={from_index}) is influenced by confounders."
)
return np.nan
# from_index + parents indices
parents = np.where(np.abs(self._adjacency_matrix[from_index]) > 0)[0]
predictors = [from_index]
predictors.extend(parents)
# Estimate total effect
coefs = predict_adaptive_lasso(X, predictors, to_index)
return coefs[0]
[docs]
def estimate_total_effect2(self, from_index, to_index):
"""Estimate total effect using causal model.
Parameters
----------
from_index :
Index of source variable to estimate total effect.
to_index :
Index of destination variable to estimate total effect.
Returns
-------
total_effect : float
Estimated total effect.
"""
# Check from/to causal order
for i, order in enumerate(self._causal_order):
if hasattr(order, "__iter__") and from_index in order:
from_order = i
break
elif not hasattr(order, "__iter__") and int(from_index) == int(order):
from_order = i
break
for i, order in enumerate(self._causal_order):
if hasattr(order, "__iter__") and to_index in order:
to_order = i
break
elif not hasattr(order, "__iter__") and int(to_index) == int(order):
to_order = i
break
if from_order > to_order:
warnings.warn(
f"The estimated causal effect may be incorrect because "
f"the causal order of the destination variable (to_index={to_index}) "
f"is earlier than the source variable (from_index={from_index})."
)
# Check confounders
if True in np.isnan(self._adjacency_matrix[from_index]):
warnings.warn(
f"The estimated causal effect may be incorrect because "
f"the source variable (from_index={from_index}) is influenced by confounders."
)
return np.nan
effect = calculate_total_effect(self._adjacency_matrix, from_index, to_index)
return effect
[docs]
def get_error_independence_p_values(self, X):
"""Calculate the p-value matrix of independence between error variables.
Parameters
----------
X : array-like, shape (n_samples, n_features)
Original data, where n_samples is the number of samples
and n_features is the number of features.
Returns
-------
independence_p_values : array-like, shape (n_features, n_features)
p-value matrix of independence between error variables.
"""
# Check parameters
X = check_array(X)
n_samples = X.shape[0]
n_features = X.shape[1]
E = X - np.dot(self._adjacency_matrix, X.T).T
nan_cols = list(set(np.argwhere(np.isnan(self._adjacency_matrix)).ravel()))
p_values = np.zeros([n_features, n_features])
for i, j in itertools.combinations(range(n_features), 2):
if i in nan_cols or j in nan_cols:
p_values[i, j] = np.nan
p_values[j, i] = np.nan
else:
_, p_value = hsic_test_gamma(
np.reshape(E[:, i], [n_samples, 1]),
np.reshape(E[:, j], [n_samples, 1]),
)
p_values[i, j] = p_value
p_values[j, i] = p_value
return p_values
@property
def causal_order_(self):
"""Estimated causal ordering.
Returns
-------
causal_order_ : array-like, shape (n_features)
The causal order of fitted model, where
n_features is the number of features.
Set the features as a list if order is unknown.
"""
return self._causal_order
@property
def adjacency_matrix_(self):
"""Estimated adjacency matrix.
Returns
-------
adjacency_matrix_ : array-like, shape (n_features, n_features)
The adjacency matrix B of fitted model, where
n_features is the number of features.
Set np.nan if order is unknown.
"""
return self._adjacency_matrix
[docs]
def bootstrap(self, X, n_sampling):
"""Evaluate the statistical reliability of DAG based on the bootstrapping.
Parameters
----------
X : array-like, shape (n_samples, n_features)
Training data, where ``n_samples`` is the number of samples
and ``n_features`` is the number of features.
n_sampling : int
Number of bootstrapping samples.
Returns
-------
result : BootstrapResult
Returns the result of bootstrapping.
"""
# Check parameters
X = check_array(X)
if isinstance(n_sampling, (numbers.Integral, np.integer)):
if not 0 < n_sampling:
raise ValueError("n_sampling must be an integer greater than 0.")
else:
raise ValueError("n_sampling must be an integer greater than 0.")
# Bootstrapping
adjacency_matrices = np.zeros([n_sampling, X.shape[1], X.shape[1]])
total_effects = np.zeros([n_sampling, X.shape[1], X.shape[1]])
for i in range(n_sampling):
resampled_X = resample(X)
self.fit(resampled_X)
adjacency_matrices[i] = self._adjacency_matrix
# Calculate total effects
for c, from_ in enumerate(self._causal_order):
for to in self._causal_order[c + 1 :]:
if hasattr(from_, "__iter__"):
for from_item in from_:
total_effects[
i, to, from_item
] = self.estimate_total_effect2(from_item, to)
else:
total_effects[i, to, from_] = self.estimate_total_effect2(
from_, to
)
return BootstrapResult(adjacency_matrices, total_effects)