"""
Python implementation of the LiNGAM algorithms.
The LiNGAM Project: https://sites.google.com/view/sshimizu06/lingam
"""
import numpy as np
from sklearn.utils import check_array
from .base import _BaseLiNGAM
from .hsic import hsic_test_gamma
[docs]
class RESIT(_BaseLiNGAM):
"""Implementation of RESIT(regression with subsequent independence test) Algorithm [1]_
References
----------
.. [1] Jonas Peters, Joris M Mooij, Dominik Janzing, and Bernhard Sch ̈olkopf.
Causal discovery with continuous additive noise models.
Journal of Machine Learning Research, 15:2009-2053, 2014.
Notes
-----
RESIT algorithm returns an **adjacency matrix consisting of zeros or ones**,
rather than an adjacency matrix consisting of causal coefficients,
in order to estimate nonlinear causality.
"""
[docs]
def __init__(self, regressor, random_state=None, alpha=0.01):
"""Construct a RESIT model.
Parameters
----------
regressor : regressor object implementing 'fit' and 'predict' function (default=None)
Regressor to compute residuals.
This regressor object must have ``fit`` method and ``predict`` function like scikit-learn's model.
random_state : int, optional (default=None)
``random_state`` is the seed used by the random number generator.
alpha : float, optional (default=0.01)
Alpha level for HSIC independence test when removing superfluous edges.
"""
# Check parameters
if regressor is None:
raise ValueError("Specify regression model in 'regressor'.")
else:
if not (hasattr(regressor, "fit") and hasattr(regressor, "predict")):
raise ValueError("'regressor' has no fit or predict method.")
if alpha < 0.0:
raise ValueError("alpha must be an float greater than 0.")
super().__init__(random_state)
self._alpha = alpha
self._reg = regressor
[docs]
def fit(self, X):
"""Fit the model to X.
Parameters
----------
X : array-like, shape (n_samples, n_features)
Training data, where ``n_samples`` is the number of samples
and ``n_features`` is the number of features.
Returns
-------
self : object
Returns the instance itself.
"""
# Check parameters
X = check_array(X)
n_features = X.shape[1]
# Determine topological order
pa, pi = self._estimate_order(X)
# Remove superfluous edges
pa = self._remove_edges(X, pa, pi)
# Create adjacency matrix from parent-child relationship
adjacency_matrix = np.zeros([n_features, n_features])
for i, parents in pa.items():
for p in parents:
adjacency_matrix[i, p] = 1
self._causal_order = pi
self._adjacency_matrix = adjacency_matrix
return self
def _estimate_order(self, X):
"""Determine topological order"""
S = np.arange(X.shape[1])
pa = {}
pi = []
for _ in range(X.shape[1]):
if len(S) == 1:
pa[S[0]] = []
pi.insert(0, S[0])
continue
hsic_stats = []
for k in S:
# Regress Xk on {Xi}
predictors = [i for i in S if i != k]
self._reg.fit(X[:, predictors], X[:, k])
residual = X[:, k] - self._reg.predict(X[:, predictors])
# Measure dependence between residuals and {Xi}
hsic_stat, hsic_p = hsic_test_gamma(residual, X[:, predictors])
hsic_stats.append(hsic_stat)
k = S[np.argmin(hsic_stats)]
S = S[S != k]
pa[k] = S.tolist()
pi.insert(0, k)
return pa, pi
def _remove_edges(self, X, pa, pi):
"""Remove superfluous edges"""
for k in range(1, X.shape[1]):
parents = pa[pi[k]].copy()
for l in parents:
# Regress Xk on {Xi}
predictors = [i for i in pa[pi[k]] if i != l]
# if len(predictors) <= 1:
if len(predictors) < 1:
continue
self._reg.fit(X[:, predictors], X[:, pi[k]])
residual = X[:, pi[k]] - self._reg.predict(X[:, predictors])
# Measure dependence between residuals and {Xi}
_, hsic_p = hsic_test_gamma(residual, X[:, predictors])
# eliminate edge
if hsic_p > self._alpha:
pa[pi[k]].remove(l)
return pa
[docs]
def estimate_total_effect(self, X, from_index, to_index):
"""Estimate total effect using causal model.
Parameters
----------
X : array-like, shape (n_samples, n_features)
Original data, where n_samples is the number of samples
and n_features is the number of features.
from_index :
Index of source variable to estimate total effect.
to_index :
Index of destination variable to estimate total effect.
Returns
-------
total_effect : float
**Because RESIT is a nonlinear algorithm, it cannot estimate the total effect and always returns a value of zero**
"""
return 0
[docs]
def get_error_independence_p_values(self, X):
"""Calculate the p-value matrix of independence between error variables.
Parameters
----------
X : array-like, shape (n_samples, n_features)
Original data, where n_samples is the number of samples
and n_features is the number of features.
Returns
-------
independence_p_values : array-like, shape (n_features, n_features)
**RESIT always returns zero**
"""
n_features = X.shape[1]
p_values = np.zeros([n_features, n_features])
return p_values