"""
Python implementation of the LiNGAM algorithms.
The LiNGAM Project: https://sites.google.com/view/sshimizu06/lingam
"""
import itertools
import numbers
import warnings
import numpy as np
from sklearn.utils import check_array, resample
from .bootstrap import BootstrapResult
from .direct_lingam import DirectLiNGAM
from .hsic import hsic_test_gamma
from .utils import predict_adaptive_lasso, calculate_total_effect
[docs]
class MultiGroupDirectLiNGAM(DirectLiNGAM):
"""Implementation of DirectLiNGAM Algorithm with multiple groups [1]_
References
----------
.. [1] S. Shimizu. Joint estimation of linear non-Gaussian acyclic models. Neurocomputing, 81: 104-107, 2012.
"""
[docs]
def __init__(
self,
random_state=None,
prior_knowledge=None,
apply_prior_knowledge_softly=False,
):
"""Construct a model.
Parameters
----------
random_state : int, optional (default=None)
``random_state`` is the seed used by the random number generator.
prior_knowledge : array-like, shape (n_features, n_features), optional (default=None)
Prior knowledge used for causal discovery, where ``n_features`` is the number of features.
The elements of prior knowledge matrix are defined as follows [1]_:
* ``0`` : :math:`x_i` does not have a directed path to :math:`x_j`
* ``1`` : :math:`x_i` has a directed path to :math:`x_j`
* ``-1`` : No prior knowledge is available to know if either of the two cases above (0 or 1) is true.
apply_prior_knowledge_softly : boolean, optional (default=False)
If True, apply prior knowledge softly.
"""
super().__init__(random_state, prior_knowledge, apply_prior_knowledge_softly)
[docs]
def fit(self, X_list):
"""Fit the model to multiple datasets.
Parameters
----------
X_list : list, shape [X, ...]
Multiple datasets for training, where ``X`` is an dataset.
The shape of ''X'' is (n_samples, n_features),
where ``n_samples`` is the number of samples and ``n_features`` is the number of features.
Returns
-------
self : object
Returns the instance itself.
"""
# Check parameters
X_list = self._check_X_list(X_list)
if self._Aknw is not None:
if (self._n_features, self._n_features) != self._Aknw.shape:
raise ValueError(
"The shape of prior knowledge must be (n_features, n_features)"
)
else:
# Extract all partial orders in prior knowledge matrix
if not self._apply_prior_knowledge_softly:
self._partial_orders = self._extract_partial_orders(self._Aknw)
# Causal discovery
U = np.arange(self._n_features)
K = []
X_list_ = [np.copy(X) for X in X_list]
for _ in range(self._n_features):
m = self._search_causal_order(X_list_, U)
for i in U:
if i != m:
for d in range(len(X_list_)):
X_list_[d][:, i] = self._residual(
X_list_[d][:, i], X_list_[d][:, m]
)
K.append(m)
U = U[U != m]
if (self._Aknw is not None) and (not self._apply_prior_knowledge_softly):
self._partial_orders = self._partial_orders[
self._partial_orders[:, 0] != m
]
self._causal_order = K
self._adjacency_matrices = []
for X in X_list:
self._estimate_adjacency_matrix(X, prior_knowledge=self._Aknw)
self._adjacency_matrices.append(self._adjacency_matrix)
return self
[docs]
def bootstrap(self, X_list, n_sampling):
"""Evaluate the statistical reliability of DAG based on the bootstrapping.
Parameters
----------
X_list : array-like, shape (X, ...)
Multiple datasets for training, where ``X`` is an dataset.
The shape of ''X'' is (n_samples, n_features),
where ``n_samples`` is the number of samples and ``n_features`` is the number of features.
n_sampling : int
Number of bootstrapping samples.
Returns
-------
results : array-like, shape (BootstrapResult, ...)
Returns the results of bootstrapping for multiple datasets.
"""
# Check parameters
X_list = self._check_X_list(X_list)
if isinstance(n_sampling, (numbers.Integral, np.integer)):
if not 0 < n_sampling:
raise ValueError("n_sampling must be an integer greater than 0.")
else:
raise ValueError("n_sampling must be an integer greater than 0.")
# Bootstrapping
adjacency_matrices_list = np.zeros(
[len(X_list), n_sampling, self._n_features, self._n_features]
)
total_effects_list = np.zeros(
[len(X_list), n_sampling, self._n_features, self._n_features]
)
for n in range(n_sampling):
resampled_X_list = [resample(X) for X in X_list]
self.fit(resampled_X_list)
for i, am in enumerate(self._adjacency_matrices):
adjacency_matrices_list[i][n] = am
# Calculate total effects
for c, from_ in enumerate(self._causal_order):
for to in self._causal_order[c + 1 :]:
effects = self.estimate_total_effect2(from_, to)
for i, effect in enumerate(effects):
total_effects_list[i, n, to, from_] = effect
result_list = []
for am, te in zip(adjacency_matrices_list, total_effects_list):
result_list.append(BootstrapResult(am, te))
return result_list
[docs]
def estimate_total_effect(self, X_list, from_index, to_index):
"""Estimate total effect using causal model.
Parameters
----------
X_list : array-like, shape (X, ...)
Multiple datasets for training, where ``X`` is an dataset.
The shape of ''X'' is (n_samples, n_features),
where ``n_samples`` is the number of samples and ``n_features`` is the number of features.
from_index :
Index of source variable to estimate total effect.
to_index :
Index of destination variable to estimate total effect.
Returns
-------
total_effect : float
Estimated total effect.
"""
# Check parameters
X_list = self._check_X_list(X_list)
# Check from/to causal order
from_order = self._causal_order.index(from_index)
to_order = self._causal_order.index(to_index)
if from_order > to_order:
warnings.warn(
f"The estimated causal effect may be incorrect because "
f"the causal order of the destination variable (to_index={to_index}) "
f"is earlier than the source variable (from_index={from_index})."
)
effects = []
for X, am in zip(X_list, self._adjacency_matrices):
# from_index + parents indices
parents = np.where(np.abs(am[from_index]) > 0)[0]
predictors = [from_index]
predictors.extend(parents)
# Estimate total effect
coefs = predict_adaptive_lasso(X, predictors, to_index)
effects.append(coefs[0])
return effects
[docs]
def estimate_total_effect2(self, from_index, to_index):
"""Estimate total effect using causal model.
Parameters
----------
from_index :
Index of source variable to estimate total effect.
to_index :
Index of destination variable to estimate total effect.
Returns
-------
total_effect : float
Estimated total effect.
"""
# Check from/to causal order
from_order = self._causal_order.index(from_index)
to_order = self._causal_order.index(to_index)
if from_order > to_order:
warnings.warn(
f"The estimated causal effect may be incorrect because "
f"the causal order of the destination variable (to_index={to_index}) "
f"is earlier than the source variable (from_index={from_index})."
)
effects = []
for am in self._adjacency_matrices:
effect = calculate_total_effect(am, from_index, to_index)
effects.append(effect)
return effects
[docs]
def get_error_independence_p_values(self, X_list):
"""Calculate the p-value matrix of independence between error variables.
Parameters
----------
X_list : array-like, shape (X, ...)
Multiple datasets for training, where ``X`` is an dataset.
The shape of ''X'' is (n_samples, n_features),
where ``n_samples`` is the number of samples and ``n_features`` is the number of features.
Returns
-------
independence_p_values : array-like, shape (n_datasets, n_features, n_features)
p-value matrix of independence between error variables.
"""
# Check parameters
X_list = self._check_X_list(X_list)
p_values = np.zeros([len(X_list), self._n_features, self._n_features])
for d, (X, am) in enumerate(zip(X_list, self._adjacency_matrices)):
n_samples = X.shape[0]
E = X - np.dot(am, X.T).T
for i, j in itertools.combinations(range(self._n_features), 2):
_, p_value = hsic_test_gamma(
np.reshape(E[:, i], [n_samples, 1]),
np.reshape(E[:, j], [n_samples, 1]),
)
p_values[d, i, j] = p_value
p_values[d, j, i] = p_value
return p_values
def _check_X_list(self, X_list):
"""Check input X list."""
if not isinstance(X_list, list):
raise ValueError("X_list must be a list.")
if len(X_list) < 2:
raise ValueError("X_list must be a list containing at least two items")
self._n_features = check_array(X_list[0]).shape[1]
X_list_ = []
for X in X_list:
X_ = check_array(X)
if X_.shape[1] != self._n_features:
raise ValueError(
"X_list must be a list with the same number of features"
)
X_list_.append(X_)
return X_list_
def _search_causal_order(self, X_list, U):
"""Search the causal ordering."""
Uc, Vj = self._search_candidate(U)
if len(Uc) == 1:
return Uc[0]
total_size = 0
for X in X_list:
total_size += len(X)
MG_list = []
for i in Uc:
MG = 0
for X in X_list:
M = 0
for j in U:
if i != j:
xi_std = (X[:, i] - np.mean(X[:, i])) / np.std(X[:, i])
xj_std = (X[:, j] - np.mean(X[:, j])) / np.std(X[:, j])
ri_j = (xi_std if i in Vj and j in Uc else self._residual(xi_std, xj_std))
rj_i = (xj_std if j in Vj and i in Uc else self._residual(xj_std, xi_std))
M += (np.min([0, self._diff_mutual_info(xi_std, xj_std, ri_j, rj_i)]) ** 2)
MG += M * (len(X) / total_size)
MG_list.append(-1.0 * MG)
return Uc[np.argmax(MG_list)]
@property
def adjacency_matrices_(self):
"""Estimated adjacency matrices.
Returns
-------
adjacency_matrices_ : array-like, shape (B, ...)
The list of adjacency matrix B for multiple datasets.
The shape of B is (n_features, n_features), where
n_features is the number of features.
"""
return self._adjacency_matrices