"""
Python implementation of the LiNGAM algorithms.
The LiNGAM Project: https://sites.google.com/view/sshimizu06/lingam
"""
import itertools
import numbers
import warnings
import numpy as np
from sklearn.linear_model import LinearRegression
from sklearn.utils import check_array
from .direct_lingam import DirectLiNGAM
from .hsic import hsic_test_gamma
from .utils import predict_adaptive_lasso, find_all_paths, calculate_total_effect
[docs]
class LongitudinalLiNGAM:
"""Implementation of Longitudinal LiNGAM algorithm [1]_
References
----------
.. [1] K. Kadowaki, S. Shimizu, and T. Washio. Estimation of causal structures
in longitudinal data using non-Gaussianity. In Proc. 23rd IEEE International
Workshop on Machine Learning for Signal Processing (MLSP2013), pp. 1--6, Southampton, United Kingdom, 2013.
"""
[docs]
def __init__(self, n_lags=1, measure="pwling", random_state=None):
"""Construct a model.
Parameters
----------
n_lags : int, optional (default=1)
Number of lags.
measure : {'pwling', 'kernel'}, default='pwling'
Measure to evaluate independence : 'pwling' or 'kernel'.
random_state : int, optional (default=None)
``random_state`` is the seed used by the random number generator.
"""
self._n_lags = n_lags
self._measure = measure
self._random_state = random_state
self._causal_orders = None
self._adjacency_matrices = None
[docs]
def fit(self, X_list):
"""Fit the model to datasets.
Parameters
----------
X_list : list, shape [X, ...]
Longitudinal multiple datasets for training, where ``X`` is an dataset.
The shape of ``X`` is (n_samples, n_features),
where ``n_samples`` is the number of samples and ``n_features`` is the number of features.
Returns
-------
self : object
Returns the instance itself.
"""
# Check parameters
if not isinstance(X_list, (list, np.ndarray)):
raise ValueError("X_list must be a array-like.")
if len(X_list) < 2:
raise ValueError("X_list must be a list containing at least two items")
self._T = len(X_list)
self._n = check_array(X_list[0]).shape[0]
self._p = check_array(X_list[0]).shape[1]
X_t = []
for X in X_list:
X = check_array(X)
if X.shape != (self._n, self._p):
raise ValueError("X_list must be a list with the same shape")
X_t.append(X.T)
M_tau, N_t = self._compute_residuals(X_t)
B_t, causal_orders = self._estimate_instantaneous_effects(N_t)
B_tau = self._estimate_lagged_effects(B_t, M_tau)
# output B(t,t), B(t,t-τ)
self._adjacency_matrices = np.empty(
(self._T, 1 + self._n_lags, self._p, self._p)
)
self._adjacency_matrices[:, :] = np.nan
for t in range(1, self._T):
self._adjacency_matrices[t, 0] = B_t[t]
for l in range(self._n_lags):
if t - l != 0:
self._adjacency_matrices[t, l + 1] = B_tau[t, l]
self._residuals = np.zeros((self._T, self._n, self._p))
for t in range(self._T):
self._residuals[t] = N_t[t].T
self._causal_orders = causal_orders
return self
[docs]
def bootstrap(self, X_list, n_sampling, start_from_t=1):
"""Evaluate the statistical reliability of DAG based on the bootstrapping.
Parameters
----------
X_list : array-like, shape (X, ...)
Longitudinal multiple datasets for training, where ``X`` is an dataset.
The shape of ''X'' is (n_samples, n_features),
where ``n_samples`` is the number of samples and ``n_features`` is the number of features.
n_sampling : int
Number of bootstrapping samples.
Returns
-------
results : array-like, shape (BootstrapResult, ...)
Returns the results of bootstrapping for multiple datasets.
"""
# Check parameters
if not isinstance(X_list, (list, np.ndarray)):
raise ValueError("X_list must be a array-like.")
if len(X_list) < 2:
raise ValueError("X_list must be a list containing at least two items")
self._T = len(X_list)
self._n = check_array(X_list[0]).shape[0]
self._p = check_array(X_list[0]).shape[1]
X_t = []
for X in X_list:
X = check_array(X)
if X.shape != (self._n, self._p):
raise ValueError("X_list must be a list with the same shape")
X_t.append(X)
# Bootstrapping
adjacency_matrices = np.zeros(
(n_sampling, self._T, 1 + self._n_lags, self._p, self._p)
)
total_effects = np.zeros((n_sampling, self._T * self._p, self._T * self._p))
for i in range(n_sampling):
resampled_X_t = np.empty((self._T, self._n, self._p))
indices = np.random.randint(0, self._n, size=(self._n,))
for t in range(self._T):
resampled_X_t[t] = X_t[t][indices, :]
self.fit(resampled_X_t)
adjacency_matrices[i] = self._adjacency_matrices
# Calculate total effects
for from_t in range(start_from_t, self._T):
for c, from_ in enumerate(self._causal_orders[from_t]):
to_t = from_t
for to in self._causal_orders[from_t][c + 1 :]:
total_effects[
i, to_t * self._p + to, from_t * self._p + from_
] = self.estimate_total_effect2(from_t, from_, to_t, to)
for to_t in range(from_t + 1, self._T):
for to in self._causal_orders[to_t]:
total_effects[
i, to_t * self._p + to, from_t * self._p + from_
] = self.estimate_total_effect2(from_t, from_, to_t, to)
return LongitudinalBootstrapResult(self._T, adjacency_matrices, total_effects)
[docs]
def estimate_total_effect(self, X_t, from_t, from_index, to_t, to_index):
"""Estimate total effect using causal model.
Parameters
----------
X_t : array-like, shape (timepoint, n_samples, n_features)
Original data, where n_samples is the number of samples
and n_features is the number of features.
from _t :
The timepoint of source variable.
from_index :
Index of source variable to estimate total effect.
to_t :
The timepoint of destination variable.
to_index :
Index of destination variable to estimate total effect.
Returns
-------
total_effect : float
Estimated total effect.
"""
# Check from/to causal order
if to_t == from_t:
from_order = self._causal_orders[to_t].index(from_index)
to_order = self._causal_orders[from_t].index(to_index)
if from_order > to_order:
warnings.warn(
f"The estimated causal effect may be incorrect because "
f"the causal order of the destination variable (to_t={to_t}, to_index={to_index}) "
f"is earlier than the source variable (from_t={from_t}, from_index={from_index})."
)
elif to_t < from_t:
warnings.warn(
f"The estimated causal effect may be incorrect because "
f"the causal order of the destination variable (to_t={to_t}) "
f"is earlier than the source variable (from_t={from_t})."
)
# X + lagged X
# n_features * (to + from + n_lags)
X_joined = np.zeros((self._n, self._p * (2 + self._n_lags)))
X_joined[:, 0 : self._p] = X_t[to_t]
for tau in range(1 + self._n_lags):
pos = self._p + self._p * tau
X_joined[:, pos : pos + self._p] = X_t[from_t - tau]
am = np.concatenate([*self._adjacency_matrices[to_t]], axis=1)
# from_index + parents indices
parents = np.where(np.abs(am[from_index]) > 0)[0]
predictors = [from_index + self._p]
predictors.extend(parents + self._p)
# Estimate total effect
coefs = predict_adaptive_lasso(X_joined, predictors, to_index)
return coefs[0]
[docs]
def estimate_total_effect2(self, from_t, from_index, to_t, to_index):
"""Estimate total effect using causal model.
Parameters
----------
from _t :
The timepoint of source variable.
from_index :
Index of source variable to estimate total effect.
to_t :
The timepoint of destination variable.
to_index :
Index of destination variable to estimate total effect.
Returns
-------
total_effect : float
Estimated total effect.
"""
# Check from/to causal order
if to_t == from_t:
from_order = self._causal_orders[to_t].index(from_index)
to_order = self._causal_orders[from_t].index(to_index)
if from_order > to_order:
warnings.warn(
f"The estimated causal effect may be incorrect because "
f"the causal order of the destination variable (to_t={to_t}, to_index={to_index}) "
f"is earlier than the source variable (from_t={from_t}, from_index={from_index})."
)
elif to_t < from_t:
warnings.warn(
f"The estimated causal effect may be incorrect because "
f"the causal order of the destination variable (to_t={to_t}) "
f"is earlier than the source variable (from_t={from_t})."
)
am = np.concatenate([*self._adjacency_matrices[to_t]], axis=1)
am = np.pad(am, [(0, am.shape[1] - am.shape[0]), (0, 0)])
from_index = from_index + self._p * (to_t - from_t)
effect = calculate_total_effect(am, from_index, to_index)
return effect
[docs]
def get_error_independence_p_values(self):
"""Calculate the p-value matrix of independence between error variables.
Returns
-------
independence_p_values : array-like, shape (n_features, n_features)
p-value matrix of independence between error variables.
"""
E_list = np.empty((self._T, self._n, self._p))
for t, resid in enumerate(self.residuals_):
B_t = self._adjacency_matrices[t, 0]
E_list[t] = np.dot(np.eye(B_t.shape[0]) - B_t, resid.T).T
p_values_list = np.zeros([self._T, self._p, self._p])
p_values_list[:, :, :] = np.nan
for t in range(1, self._T):
p_values = np.zeros([self._p, self._p])
for i, j in itertools.combinations(range(self._p), 2):
_, p_value = hsic_test_gamma(
np.reshape(E_list[t][:, i], [self._n, 1]),
np.reshape(E_list[t][:, j], [self._n, 1]),
)
p_values[i, j] = p_value
p_values[j, i] = p_value
p_values_list[t] = p_values
return p_values_list
def _compute_residuals(self, X_t):
"""Compute residuals N(t)"""
M_tau = np.zeros((self._T, self._n_lags, self._p, self._p))
N_t = np.zeros((self._T, self._p, self._n))
N_t[:, :, :] = np.nan
for t in range(1, self._T):
# predictors
X_predictors = np.zeros((self._n, self._p * (1 + self._n_lags)))
for tau in range(self._n_lags):
pos = self._p * tau
X_predictors[:, pos : pos + self._p] = X_t[t - (tau + 1)].T
# estimate M(t,t-τ) by regression
X_target = X_t[t].T
for i in range(self._p):
reg = LinearRegression()
reg.fit(X_predictors, X_target[:, i])
for tau in range(self._n_lags):
pos = self._p * tau
M_tau[t, tau, i] = reg.coef_[pos : pos + self._p]
# Compute N(t)
N_t[t] = X_t[t]
for tau in range(self._n_lags):
N_t[t] = N_t[t] - np.dot(M_tau[t, tau], X_t[t - (tau + 1)])
return M_tau, N_t
def _estimate_instantaneous_effects(self, N_t):
"""Estimate instantaneous effects B(t,t) by applying LiNGAM"""
causal_orders = [[np.nan] * self._p]
B_t = np.zeros((self._T, self._p, self._p))
for t in range(1, self._T):
model = DirectLiNGAM(measure=self._measure)
model.fit(N_t[t].T)
causal_orders.append(model.causal_order_)
B_t[t] = model.adjacency_matrix_
return B_t, causal_orders
def _estimate_lagged_effects(self, B_t, M_tau):
"""Estimate lagged effects B(t,t-τ)"""
B_tau = np.zeros((self._T, self._n_lags, self._p, self._p))
for t in range(self._T):
for tau in range(self._n_lags):
B_tau[t, tau] = np.dot(np.eye(self._p) - B_t[t], M_tau[t, tau])
return B_tau
@property
def causal_orders_(self):
"""Estimated causal ordering.
Returns
-------
causal_order_ : array-like, shape (causal_order, ...)
The causal order of fitted models for B(t,t).
The shape of causal_order is (n_features),
where ``n_features`` is the number of features.
"""
return self._causal_orders
@property
def adjacency_matrices_(self):
"""Estimated adjacency matrices.
Returns
-------
adjacency_matrices_ : array-like, shape ((B(t,t), B(t,t-1), ..., B(t,t-τ)), ...)
The list of adjacency matrix B(t,t) and B(t,t-τ) for longitudinal datasets.
The shape of B(t,t) and B(t,t-τ) is (n_features, n_features), where
``n_features`` is the number of features.
**If the previous data required for the calculation are not available,
such as B(t,t) or B(t,t-τ) at t=0, all elements of the matrix are nan**.
"""
return self._adjacency_matrices
@property
def residuals_(self):
"""Residuals of regression.
Returns
-------
residuals_ : list, shape [E, ...]
Residuals of regression, where ``E`` is an dataset.
The shape of ``E`` is (n_samples, n_features),
where ``n_samples`` is the number of samples and ``n_features`` is the number of features.
"""
return self._residuals
[docs]
class LongitudinalBootstrapResult(object):
"""The result of bootstrapping for LongitudinalLiNGAM."""
[docs]
def __init__(self, n_timepoints, adjacency_matrices, total_effects):
"""Construct a BootstrapResult.
Parameters
----------
adjacency_matrices : array-like, shape (n_sampling)
The adjacency matrix list by bootstrapping.
total_effects : array-like, shape (n_sampling)
The total effects list by bootstrapping.
"""
self._n_timepoints = n_timepoints
self._adjacency_matrices = adjacency_matrices
self._total_effects = total_effects
@property
def adjacency_matrices_(self):
"""The adjacency matrix list by bootstrapping.
Returns
-------
adjacency_matrices_ : array-like, shape (n_sampling)
The adjacency matrix list, where ``n_sampling`` is
the number of bootstrap sampling.
"""
return self._adjacency_matrices
@property
def total_effects_(self):
"""The total effect list by bootstrapping.
Returns
-------
total_effects_ : array-like, shape (n_sampling)
The total effect list, where ``n_sampling`` is
the number of bootstrap sampling.
"""
return self._total_effects
[docs]
def get_causal_direction_counts(
self,
n_directions=None,
min_causal_effect=None,
split_by_causal_effect_sign=False,
):
"""Get causal direction count as a result of bootstrapping.
Parameters
----------
n_directions : int, optional (default=None)
If int, then The top ``n_directions`` items are included in the result
min_causal_effect : float, optional (default=None)
Threshold for detecting causal direction.
If float, then causal directions with absolute values of causal
effects less than ``min_causal_effect`` are excluded.
split_by_causal_effect_sign : boolean, optional (default=False)
If True, then causal directions are split depending on the sign of the causal effect.
Returns
-------
causal_direction_counts : dict
List of causal directions sorted by count in descending order.
The dictionary has the following format::
{'from': [n_directions], 'to': [n_directions], 'count': [n_directions]}
where ``n_directions`` is the number of causal directions.
"""
# Check parameters
if isinstance(n_directions, (numbers.Integral, np.integer)):
if not 0 < n_directions:
raise ValueError("n_directions must be an integer greater than 0")
elif n_directions is None:
pass
else:
raise ValueError("n_directions must be an integer greater than 0")
if min_causal_effect is None:
min_causal_effect = 0.0
else:
if not 0.0 < min_causal_effect:
raise ValueError("min_causal_effect must be an value greater than 0.")
# Count causal directions
cdc_list = []
for t in range(self._n_timepoints):
directions = []
for m in self._adjacency_matrices:
am = np.concatenate([*m[t]], axis=1)
direction = np.array(np.where(np.abs(am) > min_causal_effect))
if split_by_causal_effect_sign:
signs = (
np.array([np.sign(am[i][j]) for i, j in direction.T])
.astype("int64")
.T
)
direction = np.vstack([direction, signs])
directions.append(direction.T)
directions = np.concatenate(directions)
if len(directions) == 0:
cdc = {"from": [], "to": [], "count": []}
if split_by_causal_effect_sign:
cdc["sign"] = []
cdc_list.append(cdc)
continue
directions, counts = np.unique(directions, axis=0, return_counts=True)
sort_order = np.argsort(-counts)
sort_order = (
sort_order[:n_directions] if n_directions is not None else sort_order
)
counts = counts[sort_order]
directions = directions[sort_order]
cdc = {
"from": directions[:, 1].tolist(),
"to": directions[:, 0].tolist(),
"count": counts.tolist(),
}
if split_by_causal_effect_sign:
cdc["sign"] = directions[:, 2].tolist()
cdc_list.append(cdc)
return cdc_list
[docs]
def get_directed_acyclic_graph_counts(
self, n_dags=None, min_causal_effect=None, split_by_causal_effect_sign=False
):
"""Get DAGs count as a result of bootstrapping.
Parameters
----------
n_dags : int, optional (default=None)
If int, then The top ``n_dags`` items are included in the result
min_causal_effect : float, optional (default=None)
Threshold for detecting causal direction.
If float, then causal directions with absolute values of causal effects less than
``min_causal_effect`` are excluded.
split_by_causal_effect_sign : boolean, optional (default=False)
If True, then causal directions are split depending on the sign of the causal effect.
Returns
-------
directed_acyclic_graph_counts : dict
List of directed acyclic graphs sorted by count in descending order.
The dictionary has the following format::
{'dag': [n_dags], 'count': [n_dags]}.
where ``n_dags`` is the number of directed acyclic graphs.
"""
# Check parameters
if isinstance(n_dags, (numbers.Integral, np.integer)):
if not 0 < n_dags:
raise ValueError("n_dags must be an integer greater than 0")
elif n_dags is None:
pass
else:
raise ValueError("n_dags must be an integer greater than 0")
if min_causal_effect is None:
min_causal_effect = 0.0
else:
if not 0.0 < min_causal_effect:
raise ValueError("min_causal_effect must be an value greater than 0.")
# Count directed acyclic graphs
dagc_list = []
for t in range(self._n_timepoints):
dags = []
for m in self._adjacency_matrices:
am = np.concatenate([*m[t]], axis=1)
dag = np.abs(am) > min_causal_effect
if split_by_causal_effect_sign:
direction = np.array(np.where(dag))
signs = np.zeros_like(dag).astype("int64")
for i, j in direction.T:
signs[i][j] = np.sign(am[i][j]).astype("int64")
dag = signs
dags.append(dag)
dags, counts = np.unique(dags, axis=0, return_counts=True)
sort_order = np.argsort(-counts)
sort_order = sort_order[:n_dags] if n_dags is not None else sort_order
counts = counts[sort_order]
dags = dags[sort_order]
if split_by_causal_effect_sign:
dags = [
{
"from": np.where(dag)[1].tolist(),
"to": np.where(dag)[0].tolist(),
"sign": [dag[i][j] for i, j in np.array(np.where(dag)).T],
}
for dag in dags
]
else:
dags = [
{"from": np.where(dag)[1].tolist(), "to": np.where(dag)[0].tolist()}
for dag in dags
]
dagc_list.append({"dag": dags, "count": counts.tolist()})
return dagc_list
[docs]
def get_probabilities(self, min_causal_effect=None):
"""Get bootstrap probability.
Parameters
----------
min_causal_effect : float, optional (default=None)
Threshold for detecting causal direction.
If float, then causal directions with absolute values of causal effects less than
``min_causal_effect`` are excluded.
Returns
-------
probabilities : array-like
List of bootstrap probability matrix.
"""
# check parameters
if min_causal_effect is None:
min_causal_effect = 0.0
else:
if not 0.0 < min_causal_effect:
raise ValueError("min_causal_effect must be an value greater than 0.")
prob = np.zeros(self._adjacency_matrices[0].shape)
for adj_mat in self._adjacency_matrices:
prob += np.where(np.abs(adj_mat) > min_causal_effect, 1, 0)
prob = prob / len(self._adjacency_matrices)
return prob
[docs]
def get_total_causal_effects(self, min_causal_effect=None):
"""Get total effects list.
Parameters
----------
min_causal_effect : float, optional (default=None)
Threshold for detecting causal direction.
If float, then causal directions with absolute values of causal effects less than
``min_causal_effect`` are excluded.
Returns
-------
total_causal_effects : dict
List of bootstrap total causal effect sorted by probability in descending order.
The dictionary has the following format::
{'from': [n_directions], 'to': [n_directions], 'effect': [n_directions], 'probability': [n_directions]}
where ``n_directions`` is the number of causal directions.
"""
# Check parameters
if min_causal_effect is None:
min_causal_effect = 0.0
else:
if not 0.0 < min_causal_effect:
raise ValueError("min_causal_effect must be an value greater than 0.")
# probability
probs = np.sum(
np.where(np.abs(self._total_effects) > min_causal_effect, 1, 0),
axis=0,
keepdims=True,
)[0]
probs = probs / len(self._total_effects)
# causal directions
dirs = np.array(np.where(np.abs(probs) > 0))
probs = probs[dirs[0], dirs[1]]
# calculate median effect without zero
effects = np.zeros(dirs.shape[1])
for i, (to, from_) in enumerate(dirs.T):
idx = np.where(np.abs(self._total_effects[:, to, from_]) > 0)
effects[i] = np.median(self._total_effects[:, to, from_][idx])
# sort by effect value
order = np.argsort(-probs)
dirs = dirs.T[order]
effects = effects[order]
probs = probs[order]
ce = {
"from": dirs[:, 1].tolist(),
"to": dirs[:, 0].tolist(),
"effect": effects.tolist(),
"probability": probs.tolist(),
}
return ce
[docs]
def get_paths(self, from_index, to_index, from_t, to_t, min_causal_effect=None):
"""Get all paths from the start variable to the end variable and their bootstrap probabilities.
Parameters
----------
from_index : int
Index of the variable at the start of the path.
to_index : int
Index of the variable at the end of the path.
from_t : int
The starting timepoint of the path.
to_t : int
The end timepoint of the path.
min_causal_effect : float, optional (default=None)
Threshold for detecting causal direction.
Causal directions with absolute values of causal effects less than ``min_causal_effect`` are excluded.
Returns
-------
paths : dict
List of path and bootstrap probability.
The dictionary has the following format::
{'path': [n_paths], 'effect': [n_paths], 'probability': [n_paths]}
where ``n_paths`` is the number of paths.
"""
# check parameters
if min_causal_effect is None:
min_causal_effect = 0.0
else:
if not 0.0 < min_causal_effect:
raise ValueError("min_causal_effect must be an value greater than 0.")
if to_t < from_t:
raise ValueError("to_t should be greater than or equal to from_t.")
if to_t == from_t:
if to_index == from_index:
raise ValueError("The same variable is specified for from and to.")
# Find all paths from from_index to to_index
paths_list = []
effects_list = []
for am in self._adjacency_matrices:
n_timepoints = to_t - from_t + 1
n_features = am.shape[2]
expansion_m_size = n_features * n_timepoints
expansion_m = np.zeros((expansion_m_size, expansion_m_size))
for i in range(n_timepoints):
for j in range(n_timepoints):
row = n_features * i
col = n_features * j
if col > row:
continue
expansion_m[row : row + n_features, col : col + n_features] = am[
i + from_t, i - j
]
paths, effects = find_all_paths(
expansion_m,
int(from_index),
int(n_features * (to_t - from_t) + to_index),
min_causal_effect,
)
# Convert path to string to make them easier to handle.
paths_list.extend(["_".join(map(str, p)) for p in paths])
effects_list.extend(effects)
paths_list = np.array(paths_list)
effects_list = np.array(effects_list)
# Count paths
paths_str, counts = np.unique(paths_list, axis=0, return_counts=True)
# Sort by count
order = np.argsort(-counts)
probs = counts[order] / len(self._adjacency_matrices)
paths_str = paths_str[order]
# Calculate median of causal effect for each path
effects = [
np.median(effects_list[np.where(paths_list == p)]) for p in paths_str
]
result = {
"path": [[int(i) for i in p.split("_")] for p in paths_str],
"effect": effects,
"probability": probs.tolist(),
}
return result