Source code for lingam.causal_effect

"""
Python implementation of the LiNGAM algorithms.
The LiNGAM Project: https://sites.google.com/view/sshimizu06/lingam
"""

import numpy as np
from sklearn.utils import check_array

from .base import _BaseLiNGAM


[docs] class CausalEffect(object): """Implementation of causality and prediction. [1]_ References ---------- .. [1] P. Blöbaum and S. Shimizu. Estimation of interventional effects of features on prediction. In Proc. 2017 IEEE International Workshop on Machine Learning for Signal Processing (MLSP2017), pp. 1--6, Tokyo, Japan, 2017. """
[docs] def __init__(self, causal_model): """Construct a CausalEffect. Parameters ---------- causal_model : lingam object inherits 'lingam._BaseLiNGAM' or array-like with shape (n_features, n_features) Causal model for calculating causal effects. The lingam object is ``lingam.DirectLiNGAM`` or ``lingam.ICALiNGAM``, and ``fit`` function needs to be executed already. For array-like, adjacency matrix to estimate causal effect, where ``n_features`` is the number of features. """ self._causal_model = causal_model self._B = None self._causal_order = None
def _check_init_params(self): """Check initial parameters.""" # already checked if self._B is not None: return if isinstance(self._causal_model, _BaseLiNGAM): self._B = self._causal_model.adjacency_matrix_ self._causal_order = self._causal_model.causal_order_ return elif type(self._causal_model) is np.ndarray or type(self._causal_model) is list: B = ( self._causal_model if type(self._causal_model) is np.ndarray else np.array(self._causal_model) ) if len(B.shape) != 2: raise ValueError("Specified 'causal_model' is not matrix") if B.shape[0] != B.shape[1]: raise ValueError("Specified 'causal_model' is not square matrix.") original_index = np.arange(B.shape[0]) causal_order = [] B_ = B for _ in range(B.shape[0]): zero_rows = np.where(np.sum(np.abs(B_), axis=1) < 1e-10)[0] if len(zero_rows) == 0: raise ValueError( "Specified 'causal_model' is not lower triangular matrix." ) causal_order.append(original_index[zero_rows[0]]) original_index = np.delete(original_index, zero_rows[0], 0) mask = np.delete(np.arange(len(B_)), zero_rows[0], 0) B_ = B_[mask][:, mask] self._B = B self._causal_order = causal_order return else: raise ValueError("Specified 'causal_model' cannot be used.") def _get_propagated_effects(self, En, intervention_index, intervention_value): """Get propagated effects according to causal order. Parameters ---------- En : array-like, shpae (n_features) Expectations of each noise variable. intervention_index : int Index of variable to apply intervention. intervention_value : float Value of intervention. Returns ------- propagated_effects : array-like, shpae (n_features) Propagated effects, where ``n_features`` is the number of features. """ effects = np.zeros(len(self._causal_order)) for i in self._causal_order: if i == intervention_index: effects[i] = intervention_value else: effects[i] = np.dot(self._B[i, :], effects) + En[i] return effects def _predict(self, X, pred_model): """Predict the expectation with prediction model. Parameters ---------- X : array-like, shpae (n_predictors) Predictors, where ``n_predictors`` is the number of variables. pred_model : model object implementing 'predict' or 'predict_proba' Model to predict the expectation. For linear regression or non-linear reggression, model object must have ``predict`` method. For logistic regression, model object must have ``predict_proba`` method. Returns ------- pred : float Predicted value. """ if hasattr(pred_model, "predict_proba"): p0, p1 = pred_model.predict_proba(X.reshape(1, -1))[0] pred = p0 - p1 elif hasattr(pred_model, "predict"): pred = pred_model.predict(X.reshape(1, -1))[0] else: raise ValueError("'pred_model' has no prediction method.") return pred
[docs] def estimate_effects_on_prediction(self, X, target_index, pred_model): """Estimate the intervention effect with the prediction model. Parameters ---------- X : array-like, shape (n_samples, n_features) Original data, where ``n_samples`` is the number of samples and ``n_features`` is the number of features. target_index : int Index of target variable. pred_model : model object implementing 'predict' or 'predict_proba' Model to predict the expectation. For linear regression or non-linear reggression, model object must have ``predict`` method. For logistic regression, model object must have ``predict_proba`` method. Returns ------- intervention_effects : array-like, shape (n_features, 2) Estimated values of intervention effect. The first column of the list is the value of 'E[Y|do(Xi=mean)]-E[Y|do(Xi=mean+std)]', and the second column is the value of 'E[Y|do(Xi=mean)]–E[Y|do(Xi=mean-std)]'. The maximum value in this array is the feature having the greatest intervention effect. """ # Check parameters X = check_array(X) self._check_init_params() vars_ = [i for i in range(X.shape[1]) if i != target_index] Ex = X.mean(axis=0) En = Ex - np.dot(self._B, Ex) effects = [] for i in range(X.shape[1]): # E[Y|do(Xi=mean)] Ex_do = self._get_propagated_effects(En, i, Ex[i]) Ey_do = self._predict(Ex_do[vars_], pred_model) # E[Y|do(Xi=mean)]-E[Y|do(Xi=mean+std)] Ex_do = self._get_propagated_effects(En, i, Ex[i] + X[:, i].std()) Ey1 = Ey_do - self._predict(Ex_do[vars_], pred_model) # E[Y|do(Xi=mean)]–E[Y|do(Xi=mean-std)] Ex_do = self._get_propagated_effects(En, i, Ex[i] - X[:, i].std()) Ey2 = Ey_do - self._predict(Ex_do[vars_], pred_model) effects.append([np.abs(Ey1), np.abs(Ey2)]) return np.array(effects)
[docs] def estimate_optimal_intervention( self, X, target_index, pred_model, intervention_index, desired_output ): """Estimate of the intervention such that the expectation of the prediction of the post-intervention observations is equal or close to a specified value. Parameters ---------- X : array-like, shape (n_samples, n_features) Original data, where ``n_samples`` is the number of samples and ``n_features`` is the number of features. target_index : int Index of target variable. pred_model : model object. Model to predict the expectation. Only linear regression model can be specified. Model object musst have ``coef_`` and ``intercept_`` attributes. intervention_index : int Index of variable to apply intervention. desired_output : Desired expected post-intervention output of prediction. Returns ------- optimal_intervention : float Optimal intervention on ``intervention_index`` variable. """ # Check parameters X = check_array(X) self._check_init_params() # Allow linear regression model. if not hasattr(pred_model, "coef_") and not hasattr(pred_model, "intercept_"): raise ValueError( "'pred_model' does not have regression coefficient attributes." ) if hasattr(pred_model, "predict_proba"): raise ValueError("'pred_model' is not linear regression model.") Ex = X.mean(axis=0) En = Ex - np.dot(self._B, Ex) s = [i for i in range(X.shape[1]) if i != intervention_index] root_vars = np.where(np.sum(self._B, axis=1) == 0)[0].tolist() s = [i for i in s if i not in root_vars] alpha = np.zeros(self._B.shape[1]) alpha[intervention_index] = 1 Ex[intervention_index] = 0 En[intervention_index] = 0 while len(s) > 0: k = np.random.choice(s) parents = np.where(np.abs(self._B[k]) > 0)[0].tolist() if len(list(set(parents) & set(s))) == 0: a = 0 # local alpha u = 0 # local mew for q in parents: a = a + self._B[k, q] * alpha[q] if q != intervention_index: u = u + self._B[k, q] * Ex[q] Ex[k] = u + En[k] alpha[k] = a s.remove(k) coefs = np.insert(pred_model.coef_, target_index, 0) return (desired_output - np.dot(coefs, Ex) - pred_model.intercept_) / np.dot( coefs, alpha )