"""
Python implementation of the LiNGAM algorithms.
The LiNGAM Project: https://sites.google.com/view/sshimizu06/lingam
"""
import itertools
import warnings
import numpy as np
from sklearn.utils import check_array, resample
from statsmodels.tsa.vector_ar.var_model import VAR
from .base import _BaseLiNGAM
from .bootstrap import BootstrapResult
from .direct_lingam import DirectLiNGAM
from .hsic import hsic_test_gamma
from .utils import predict_adaptive_lasso, find_all_paths, calculate_total_effect
[docs]
class VARLiNGAM:
"""Implementation of VAR-LiNGAM Algorithm [1]_
References
----------
.. [1] Aapo Hyvärinen, Kun Zhang, Shohei Shimizu, Patrik O. Hoyer.
Estimation of a Structural Vector Autoregression Model Using Non-Gaussianity.
Journal of Machine Learning Research, 11: 1709-1731, 2010.
"""
[docs]
def __init__(
self,
lags=1,
criterion="bic",
prune=True,
ar_coefs=None,
lingam_model=None,
random_state=None,
):
"""Construct a VARLiNGAM model.
Parameters
----------
lags : int, optional (default=1)
Number of lags.
criterion : {‘aic’, ‘fpe’, ‘hqic’, ‘bic’, None}, optional (default='bic')
Criterion to decide the best lags within ``lags``.
Searching the best lags is disabled if ``criterion`` is ``None``.
prune : boolean, optional (default=True)
Whether to prune the adjacency matrix of lags.
ar_coefs : array-like, optional (default=None)
Coefficients of AR model. Estimating AR model is skipped if specified ``ar_coefs``.
Shape must be (``lags``, n_features, n_features).
lingam_model : lingam object inherits 'lingam._BaseLiNGAM', optional (default=None)
LiNGAM model for causal discovery. If None, DirectLiNGAM algorithm is selected.
random_state : int, optional (default=None)
``random_state`` is the seed used by the random number generator.
"""
self._lags = lags
self._criterion = criterion
self._prune = prune
self._ar_coefs = (
check_array(ar_coefs, allow_nd=True) if ar_coefs is not None else None
)
self._lingam_model = lingam_model
self._random_state = random_state
[docs]
def fit(self, X):
"""Fit the model to X.
Parameters
----------
X: array-like, shape (n_samples, n_features)
Training data, where ``n_samples`` is the number of samples
and ``n_features`` is the number of features.
returns
-------
self : object
Returns the instance itself.
"""
self._causal_order = None
self._adjacency_matrices = None
X = check_array(X)
lingam_model = self._lingam_model
if lingam_model is None:
lingam_model = DirectLiNGAM()
elif not isinstance(lingam_model, _BaseLiNGAM):
raise ValueError("lingam_model must be a subclass of _BaseLiNGAM")
M_taus = self._ar_coefs
if M_taus is None:
M_taus, lags, residuals = self._estimate_var_coefs(X)
else:
lags = M_taus.shape[0]
residuals = self._calc_residuals(X, M_taus, lags)
model = lingam_model
model.fit(residuals)
B_taus = self._calc_b(X, model.adjacency_matrix_, M_taus)
if self._prune:
B_taus = self._pruning(X, B_taus, model.causal_order_)
self._ar_coefs = M_taus
self._lags = lags
self._residuals = residuals
self._causal_order = model.causal_order_
self._adjacency_matrices = B_taus
return self
[docs]
def bootstrap(self, X, n_sampling):
"""Evaluate the statistical reliability of DAG based on the bootstrapping.
Parameters
----------
X : array-like, shape (n_samples, n_features)
Training data, where ``n_samples`` is the number of samples
and ``n_features`` is the number of features.
n_sampling : int
Number of bootstrapping samples.
Returns
-------
result : TimeseriesBootstrapResult
Returns the result of bootstrapping.
"""
X = check_array(X)
n_samples = X.shape[0]
n_features = X.shape[1]
# store initial settings
ar_coefs = self._ar_coefs
lags = self._lags
criterion = self._criterion
self._criterion = None
self.fit(X)
fitted_ar_coefs = self._ar_coefs
total_effects = np.zeros(
[n_sampling, n_features, n_features * (1 + self._lags)]
)
adjacency_matrices = []
for i in range(n_sampling):
sampled_residuals = resample(self._residuals, n_samples=n_samples)
resampled_X = np.zeros((n_samples, n_features))
for j in range(n_samples):
if j < lags:
resampled_X[j, :] = sampled_residuals[j]
continue
ar = np.zeros((1, n_features))
for t, M in enumerate(fitted_ar_coefs):
ar += np.dot(M, resampled_X[j - t - 1, :].T).T
resampled_X[j, :] = ar + sampled_residuals[j]
# restore initial settings
self._ar_coefs = ar_coefs
self._lags = lags
self.fit(resampled_X)
am = np.concatenate([*self._adjacency_matrices], axis=1)
adjacency_matrices.append(am)
# total effects
for c, to in enumerate(reversed(self._causal_order)):
# time t
for from_ in self._causal_order[: n_features - (c + 1)]:
total_effects[i, to, from_] = self.estimate_total_effect2(
n_features, from_, to
)
# time t-tau
for lag in range(self._lags):
for from_ in range(n_features):
total_effects[
i, to, from_ + n_features * (lag + 1)
] = self.estimate_total_effect2(n_features, from_, to, lag + 1)
self._criterion = criterion
return VARBootstrapResult(adjacency_matrices, total_effects)
[docs]
def estimate_total_effect(self, X, from_index, to_index, from_lag=0):
"""Estimate total effect using causal model.
Parameters
----------
X : array-like, shape (n_samples, n_features)
Original data, where n_samples is the number of samples
and n_features is the number of features.
from_index :
Index of source variable to estimate total effect.
to_index :
Index of destination variable to estimate total effect.
Returns
-------
total_effect : float
Estimated total effect.
"""
X = check_array(X)
n_features = X.shape[1]
# Check from/to causal order
if from_lag == 0:
from_order = self._causal_order.index(from_index)
to_order = self._causal_order.index(to_index)
if from_order > to_order:
warnings.warn(
f"The estimated causal effect may be incorrect because "
f"the causal order of the destination variable (to_index={to_index}) "
f"is earlier than the source variable (from_index={from_index})."
)
# X + lagged X
X_joined = np.zeros((X.shape[0], X.shape[1] * (1 + self._lags + from_lag)))
for p in range(1 + self._lags + from_lag):
pos = n_features * p
X_joined[:, pos : pos + n_features] = np.roll(X[:, 0:n_features], p, axis=0)
# from_index + parents indices
am = np.concatenate([*self._adjacency_matrices], axis=1)
parents = np.where(np.abs(am[from_index]) > 0)[0]
from_index = (
from_index if from_lag == 0 else from_index + (n_features * from_lag)
)
parents = parents if from_lag == 0 else parents + (n_features * from_lag)
predictors = [from_index]
predictors.extend(parents)
# estimate total effect
coefs = predict_adaptive_lasso(X_joined, predictors, to_index)
return coefs[0]
[docs]
def estimate_total_effect2(self, n_features, from_index, to_index, from_lag=0):
"""Estimate total effect using causal model.
Parameters
----------
n_features :
The number of features.
from_index :
Index of source variable to estimate total effect.
to_index :
Index of destination variable to estimate total effect.
Returns
-------
total_effect : float
Estimated total effect.
"""
# Check from/to causal order
if from_lag == 0:
from_order = self._causal_order.index(from_index)
to_order = self._causal_order.index(to_index)
if from_order > to_order:
warnings.warn(
f"The estimated causal effect may be incorrect because "
f"the causal order of the destination variable (to_index={to_index}) "
f"is earlier than the source variable (from_index={from_index})."
)
# from_index + parents indices
am = np.concatenate([*self._adjacency_matrices], axis=1)
am = np.pad(am, [(0, am.shape[1] - am.shape[0]), (0, 0)])
from_index = (
from_index if from_lag == 0 else from_index + (n_features * from_lag)
)
effect = calculate_total_effect(am, from_index, to_index)
return effect
[docs]
def get_error_independence_p_values(self):
"""Calculate the p-value matrix of independence between error variables.
Returns
-------
independence_p_values : array-like, shape (n_features, n_features)
p-value matrix of independence between error variables.
"""
nn = self.residuals_
B0 = self._adjacency_matrices[0]
E = np.dot(np.eye(B0.shape[0]) - B0, nn.T).T
n_samples = E.shape[0]
n_features = E.shape[1]
p_values = np.zeros([n_features, n_features])
for i, j in itertools.combinations(range(n_features), 2):
_, p_value = hsic_test_gamma(
np.reshape(E[:, i], [n_samples, 1]), np.reshape(E[:, j], [n_samples, 1])
)
p_values[i, j] = p_value
p_values[j, i] = p_value
return p_values
def _estimate_var_coefs(self, X):
"""Estimate coefficients of VAR"""
# XXX: VAR.fit() is not searching lags correctly
if self._criterion not in ["aic", "fpe", "hqic", "bic"]:
var = VAR(X)
result = var.fit(maxlags=self._lags, trend="n")
else:
min_value = float("Inf")
result = None
for lag in range(1, self._lags + 1):
var = VAR(X)
fitted = var.fit(maxlags=lag, ic=None, trend="n")
value = getattr(fitted, self._criterion)
if value < min_value:
min_value = value
result = fitted
return result.coefs, result.k_ar, result.resid
def _calc_residuals(self, X, M_taus, lags):
"""Calculate residulas"""
X = X.T
n_features = X.shape[0]
n_samples = X.shape[1]
residuals = np.zeros((n_features, n_samples))
for t in range(n_samples):
if t - lags < 0:
continue
estimated = np.zeros((X.shape[0], 1))
for tau in range(1, lags + 1):
estimated += np.dot(M_taus[tau - 1], X[:, t - tau].reshape((-1, 1)))
residuals[:, t] = X[:, t] - estimated.reshape((-1,))
residuals = residuals[:, lags:].T
return residuals
def _calc_b(self, X, B0, M_taus):
"""Calculate B"""
n_features = X.shape[1]
B_taus = np.array([B0])
for M in M_taus:
B_t = np.dot((np.eye(n_features) - B0), M)
B_taus = np.append(B_taus, [B_t], axis=0)
return B_taus
def _pruning(self, X, B_taus, causal_order):
"""Prune edges"""
n_features = X.shape[1]
stacked = [np.flip(X, axis=0)]
for i in range(self._lags):
stacked.append(np.roll(stacked[-1], -1, axis=0))
blocks = np.array(list(zip(*stacked)))[: -self._lags]
for i in range(n_features):
causal_order_no = causal_order.index(i)
ancestor_indexes = causal_order[:causal_order_no]
obj = np.zeros((len(blocks)))
exp = np.zeros((len(blocks), causal_order_no + n_features * self._lags))
for j, block in enumerate(blocks):
obj[j] = block[0][i]
exp[j:] = np.concatenate(
[block[0][ancestor_indexes].flatten(), block[1:][:].flatten()],
axis=0,
)
# adaptive lasso
predictors = [i for i in range(exp.shape[1])]
target = len(predictors)
X_con = np.concatenate([exp, obj.reshape(-1, 1)], axis=1)
coef = predict_adaptive_lasso(X_con, predictors, target)
B_taus[0][i, ancestor_indexes] = coef[:causal_order_no]
for j in range(len(B_taus[1:])):
B_taus[j + 1][i, :] = coef[
causal_order_no + n_features * j :
causal_order_no + n_features * j + n_features
]
return B_taus
@property
def causal_order_(self):
"""Estimated causal ordering.
Returns
-------
causal_order_ : array-like, shape (n_features)
The causal order of fitted model, where
n_features is the number of features.
"""
return self._causal_order
@property
def adjacency_matrices_(self):
"""Estimated adjacency matrix.
Returns
-------
adjacency_matrices_ : array-like, shape (lags, n_features, n_features)
The adjacency matrix of fitted model, where
n_features is the number of features.
"""
return self._adjacency_matrices
@property
def residuals_(self):
"""Residuals of regression.
Returns
-------
residuals_ : array-like, shape (n_samples)
Residuals of regression, where n_samples is the number of samples.
"""
return self._residuals
[docs]
class VARBootstrapResult(BootstrapResult):
"""The result of bootstrapping for Time series algorithm."""
[docs]
def __init__(self, adjacency_matrices, total_effects):
"""Construct a BootstrapResult.
Parameters
----------
adjacency_matrices : array-like, shape (n_sampling)
The adjacency matrix list by bootstrapping.
total_effects : array-like, shape (n_sampling)
The total effects list by bootstrapping.
"""
super().__init__(adjacency_matrices, total_effects)
[docs]
def get_paths(
self, from_index, to_index, from_lag=0, to_lag=0, min_causal_effect=None
):
"""Get all paths from the start variable to the end variable and their bootstrap probabilities.
Parameters
----------
from_index : int
Index of the variable at the start of the path.
to_index : int
Index of the variable at the end of the path.
from_lag : int
Number of lag at the start of the path.
``from_lag`` should be greater than or equal to ``to_lag``.
to_lag : int
Number of lag at the end of the path.
``from_lag`` should be greater than or equal to ``to_lag``.
min_causal_effect : float, optional (default=None)
Threshold for detecting causal direction.
Causal directions with absolute values of causal effects less than ``min_causal_effect`` are excluded.
Returns
-------
paths : dict
List of path and bootstrap probability.
The dictionary has the following format::
{'path': [n_paths], 'effect': [n_paths], 'probability': [n_paths]}
where ``n_paths`` is the number of paths.
"""
# check parameters
if min_causal_effect is None:
min_causal_effect = 0.0
else:
if not 0.0 < min_causal_effect:
raise ValueError("min_causal_effect must be an value greater than 0.")
if to_lag > from_lag:
raise ValueError("from_lag should be greater than or equal to to_lag.")
if to_lag == from_lag:
if to_index == from_index:
raise ValueError("The same variable is specified for from and to.")
# Find all paths from from_index to to_index
paths_list = []
effects_list = []
for am in self._adjacency_matrices:
expansion_m = np.zeros((am.shape[1], am.shape[1]))
n_features = am.shape[0]
n_lags = int(am.shape[1] / am.shape[0]) - 1
for i in range(n_lags + 1):
for j in range(i, n_lags + 1):
row = n_features * i
col = n_features * j
lag = col - row
expansion_m[row : row + n_features, col : col + n_features] = am[
0:n_features, lag : lag + n_features
]
paths, effects = find_all_paths(
expansion_m,
int(n_features * from_lag + from_index),
int(n_features * to_lag + to_index),
min_causal_effect,
)
# Convert path to string to make them easier to handle.
paths_list.extend(["_".join(map(str, p)) for p in paths])
effects_list.extend(effects)
paths_list = np.array(paths_list)
effects_list = np.array(effects_list)
# Count paths
paths_str, counts = np.unique(paths_list, axis=0, return_counts=True)
# Sort by count
order = np.argsort(-counts)
probs = counts[order] / len(self._adjacency_matrices)
paths_str = paths_str[order]
# Calculate median of causal effect for each path
effects = [
np.median(effects_list[np.where(paths_list == p)]) for p in paths_str
]
result = {
"path": [[int(i) for i in p.split("_")] for p in paths_str],
"effect": effects,
"probability": probs.tolist(),
}
return result