"""
Python implementation of the LiNGAM algorithms.
The LiNGAM Project: https://sites.google.com/view/sshimizu06/lingam
"""
import warnings
import numbers
import numpy as np
from scipy.stats import chi2
from sklearn.utils import check_array
from .hsic import hsic_test_gamma
from .utils import find_all_paths
[docs]
class LongitudinalRESIT:
"""Implementation of RESIT(regression with subsequent independence test) Algorithm [1]_ for longitudinal data
References
----------
.. [1] Jonas Peters, Joris M Mooij, Dominik Janzing, and Bernhard Sch ̈olkopf.
Causal discovery with continuous additive noise models.
Journal of Machine Learning Research, 15:2009-2053, 2014.
Notes
-----
RESIT algorithm returns an **adjacency matrix consisting of zeros or ones**,
rather than an adjacency matrix consisting of causal coefficients,
in order to estimate nonlinear causality.
"""
[docs]
def __init__(
self,
regressor,
n_lags=1,
prior_knowledge=None,
prior_knowledge_lag=None,
alpha=0.01,
is_common_graph=True,
):
"""Construct a LongitudinalRESIT model.
Parameters
----------
regressor : regressor object implementing 'fit' and 'predict' function (default=None)
Regressor to compute residuals.
This regressor object must have ``fit`` method and ``predict`` function like scikit-learn's model.
n_lags : int, optional (default=1)
Number of lags (past time points) to use. Must be an integer >= 1.
prior_knowledge : array-like, optional (default=None)
Prior knowledge used for the instantaneous causal graph,
assuming a common causal structure across all time points.
A single matrix of shape (n_vars, n_vars).
Values: 0=no edge, 1=edge exists, -1=unknown.
prior_knowledge_lag : array-like, optional (default=None)
Prior knowledge used for the lag causal graph,
assuming a common causal structure across all time points.
A single tensor of shape (n_lags, n_vars, n_vars).
Values: 0=no edge, 1=edge exists, -1=unknown.
alpha : float, optional (default=0.01)
Alpha level for HSIC independence test when removing superfluous edges.
is_common_graph : bool, optional (default=True)
If True, estimate a single causal graph shared across all time points.
If False, estimate an independent causal graph for each time point.
"""
if regressor is None:
raise ValueError("Specify regression model in 'regressor'.")
if not (hasattr(regressor, "fit") and hasattr(regressor, "predict")):
raise ValueError("'regressor' has no fit or predict method.")
if not isinstance(alpha, numbers.Number) or alpha < 0.0:
raise ValueError("alpha must be a float greater than 0.")
if not isinstance(n_lags, int) or n_lags < 1:
raise ValueError("n_lags must be an integer greater than or equal to 1.")
self._reg = regressor
self._n_lags = n_lags
self._alpha = alpha
self._is_common_graph = is_common_graph
self._causal_order = None
self._B_list = None
self._A_list = None
self._adjacency_matrices = None
self._prior_knowledge_raw = prior_knowledge
self._prior_knowledge_lag_raw = prior_knowledge_lag
self._Aknw = None
self._Aknw_lag = None
self._partial_orders = None
[docs]
def fit(self, X_list):
"""Fit the model to X_list.
Parameters
----------
X_list : list of ndarray, shape [(n_samples, n_vars), ...]
Multiple datasets for training, where each dataset corresponds to a time point.
The length of X_list is T (number of time points). T >= 2 is required.
Returns
-------
self : object
Returns the instance itself.
"""
X_list = self._validate_X_list(X_list)
n_vars = X_list[0].shape[1]
T = len(X_list)
self._validate_prior_knowledge(n_vars, T)
causal_order = self._estimate_causal_order(X_list)
B_list = self._estimate_instantaneous_graph(X_list, causal_order)
A_list = self._estimate_lag_graph(X_list, causal_order, B_list)
self._causal_order = causal_order
self._B_list = B_list
self._A_list = A_list
self._adjacency_matrices = self._concat_B_A(B_list, A_list, T)
return self
[docs]
def bootstrap(self, X_list, n_sampling):
"""Evaluate the statistical reliability of DAG based on the bootstrapping.
Parameters
----------
X_list : list of ndarray, shape [(n_samples, n_vars), ...]
Multiple datasets for training, where each dataset corresponds to a time point.
The length of X_list is T (number of time points). T >= 2 is required.
n_sampling : int
Number of bootstrapping samples.
Returns
-------
results : LongitudinalRESITBootstrapResult
Returns the results of bootstrapping for multiple datasets.
"""
# Check parameters
if not isinstance(X_list, (list, np.ndarray)):
raise ValueError("X_list must be a array-like.")
if len(X_list) < 2:
raise ValueError("X_list must be a list containing at least two items")
X_list = self._validate_X_list(X_list)
n_vars = X_list[0].shape[1]
n_samples = X_list[0].shape[0]
T = len(X_list)
causal_orders = []
B_lists = []
A_lists = []
adjacency_matrices_list = []
# Bootstrapping
for i in range(n_sampling):
resampled_X_t = np.empty((T, n_samples, n_vars))
indices = np.random.randint(0, n_samples, size=(n_samples,))
for t in range(T):
resampled_X_t[t] = X_list[t][indices, :]
self.fit(resampled_X_t)
causal_orders.append(self.causal_order_)
B_lists.append(self.B_list_)
A_lists.append(self.A_list_)
adjacency_matrices_list.append(self.adjacency_matrices_)
return LongitudinalRESITBootstrapResult(
T,
causal_orders,
B_lists,
A_lists,
np.array(adjacency_matrices_list),
self._is_common_graph,
)
@property
def causal_order_(self):
"""Instantaneous causal order common to all time points.
Returns
-------
causal_order_ : list of int
The causal order of the fitted model, where the first element is the most
upstream variable and the last is the sink.
"""
if self._causal_order is None:
raise ValueError("fit() has not been called.")
return self._causal_order
@property
def B_list_(self):
"""Estimated instantaneous causal graphs.
If is_common_graph=True, a list of length 1 (shared across all time points).
If is_common_graph=False, a list of length T; B_list_[0], ..., B_list_[n_lags-1] are NaN matrices.
Each element has shape (n_vars, n_vars).
B_list_[t][j, i] = 1 means there is an edge from variable i to variable j.
Returns
-------
B_list_ : list of ndarray
"""
if self._B_list is None:
raise ValueError("fit() has not been called.")
return self._B_list
@property
def A_list_(self):
"""Estimated lag causal graphs.
If is_common_graph=True, a list of length 1.
If is_common_graph=False, a list of length T; A_list_[0], ..., A_list_[n_lags-1] are NaN tensors.
Each element has shape (n_lags, n_vars, n_vars).
A_list_[t][k, j, i] = 1 means there is an edge from variable i at k+1 time steps
before time t to variable j at time t (k is 0-origin).
Returns
-------
A_list_ : list of ndarray
"""
if self._A_list is None:
raise ValueError("fit() has not been called.")
return self._A_list
@property
def adjacency_matrices_(self):
"""Estimated adjacency matrices combining instantaneous and lag graphs.
If is_common_graph=True, a list of length 1.
If is_common_graph=False, a list of length T; adjacency_matrices_[0], ..., adjacency_matrices_[n_lags-1] are NaN matrices.
Each element has shape (n_vars, n_vars*(n_lags+1)).
The first n_vars columns correspond to the instantaneous graph B, and the next n_vars columns correspond to A[0], and so on.
Returns
-------
adjacency_matrices_ : list of ndarray
"""
if self._adjacency_matrices is None:
raise ValueError("fit() has not been called.")
return self._adjacency_matrices
def _validate_X_list(self, X_list):
"""Check input X list."""
if not isinstance(X_list, (list, np.ndarray)):
raise ValueError("X_list must be a array-like.")
if len(X_list) < 2:
raise ValueError("X_list must contain at least 2 time points.")
X_list_ = []
n_vars = None
n_samples = None
for X in X_list:
X_ = check_array(X)
if n_vars is None:
n_vars = X_.shape[1]
elif X_.shape[1] != n_vars:
raise ValueError(
"All matrices in X_list must have the same number of columns."
)
if n_samples is None:
n_samples = X_.shape[0]
elif X_.shape[0] != n_samples:
raise ValueError(
"All matrices in X_list must have the same number of rows."
)
X_list_.append(X_)
if len(X_list_) < self._n_lags + 1:
warnings.warn(
f"T={len(X_list_)} < n_lags+1={self._n_lags + 1}. "
"Lag variables are only available for some time points.",
UserWarning,
stacklevel=3,
)
return X_list_
def _validate_prior_knowledge(self, n_vars, T):
"""Validate and convert prior_knowledge and prior_knowledge_lag."""
# prior_knowledge processing
if self._prior_knowledge_raw is None:
self._Aknw = None
else:
if isinstance(self._prior_knowledge_raw, list):
raise ValueError(
"prior_knowledge must be a single matrix of shape "
f"({n_vars}, {n_vars}), not a list."
)
arr = check_array(self._prior_knowledge_raw)
if arr.shape != (n_vars, n_vars):
raise ValueError(
f"prior_knowledge shape {arr.shape} must be ({n_vars}, {n_vars})."
)
self._Aknw = np.where(arr < 0, np.nan, arr)
# Update partial_orders for Phase 1
if self._Aknw is not None:
self._partial_orders = self._extract_partial_orders(self._Aknw)
else:
self._partial_orders = None
# prior_knowledge_lag processing
if self._prior_knowledge_lag_raw is None:
self._Aknw_lag = None
else:
if isinstance(self._prior_knowledge_lag_raw, list):
raise ValueError(
"prior_knowledge_lag must be a single tensor of shape "
f"({self._n_lags}, {n_vars}, {n_vars}), not a list."
)
arr = np.asarray(self._prior_knowledge_lag_raw, dtype=float)
if arr.shape != (self._n_lags, n_vars, n_vars):
raise ValueError(
f"prior_knowledge_lag shape {arr.shape} must be "
f"({self._n_lags}, {n_vars}, {n_vars})."
)
self._Aknw_lag = np.where(arr < 0, np.nan, arr)
def _extract_partial_orders(self, pk):
"""Extract partial orders from prior knowledge."""
path_pairs = np.array(np.where(pk == 1)).transpose()
no_path_pairs = np.array(np.where(pk == 0)).transpose()
# Check for inconsistencies in pairs with path
check_pairs = np.concatenate([path_pairs, path_pairs[:, [1, 0]]])
if len(check_pairs) > 0:
pairs, counts = np.unique(check_pairs, axis=0, return_counts=True)
if len(pairs[counts > 1]) > 0:
raise ValueError(
"The prior knowledge contains inconsistencies at the following indices: "
f"{pairs[counts > 1].tolist()}"
)
# Check for inconsistencies in pairs without path
# If there are duplicate pairs without path, they cancel out and are not ordered.
check_pairs = np.concatenate([no_path_pairs, no_path_pairs[:, [1, 0]]])
if len(check_pairs) > 0:
pairs, counts = np.unique(check_pairs, axis=0, return_counts=True)
check_pairs = np.concatenate([no_path_pairs, pairs[counts > 1]])
pairs, counts = np.unique(check_pairs, axis=0, return_counts=True)
no_path_pairs = pairs[counts < 2]
check_pairs = np.concatenate([path_pairs, no_path_pairs[:, [1, 0]]])
if len(check_pairs) == 0:
return check_pairs
pairs = np.unique(check_pairs, axis=0)
return pairs[:, [1, 0]] # [to, from] -> [from, to]
def _search_candidate(self, S):
"""Search for candidate features"""
if self._Aknw is None:
return S
if self._partial_orders is not None and len(self._partial_orders) != 0:
Sc = [i for i in S if i not in self._partial_orders[:, 0]]
return Sc
return S
def _apply_prior_knowledge_to_parents(self, j, parents):
"""Filter parent candidates for Phase 2 by removing edges where prior_knowledge=0."""
if self._Aknw is None:
return list(parents)
return [i for i in parents if self._Aknw[j, i] != 0]
def _apply_prior_knowledge_to_lag_parents(self, j, lag_parents):
"""Filter lag parent candidates for Phase 3 by removing edges where prior_knowledge_lag=0."""
if self._Aknw_lag is None:
return list(lag_parents)
return [(k, i) for (k, i) in lag_parents if self._Aknw_lag[k - 1, j, i] != 0]
def _compute_residual(self, y, Z):
"""Regress y on Z and return the residuals. Returns y as-is when Z is empty."""
if Z.shape[1] == 0:
return y
self._reg.fit(Z, y)
return y - self._reg.predict(Z)
def _fisher_combine_pvalues(self, p_values):
"""Combine a list of p-values using Fisher's method and return the combined p-value."""
if len(p_values) == 0:
raise ValueError("p_values must not be empty.")
chi2_stat = sum(np.inf if p == 0 else -2 * np.log(p) for p in p_values)
return float(chi2.sf(chi2_stat, df=2 * len(p_values)))
def _concat_B_A(self, B_list, A_list, T):
"""Concatenate B_list and A_list into a single matrix for counting causal directions."""
C_list = []
for B, A in zip(B_list, A_list):
B = np.asarray(B)
A = np.asarray(A)
blocks = [B] + [A[k] for k in range(A.shape[0])]
C_list.append(np.concatenate(blocks, axis=1))
if self._is_common_graph:
C_list = [C_list[0]] * T
return np.array(C_list)
# ------------------------------------------------------------------
# Phase 1: Causal order estimation
# ------------------------------------------------------------------
def _estimate_causal_order(self, X_list):
"""Determine topological order"""
n_vars = X_list[0].shape[1]
S = list(range(n_vars))
causal_order = []
for _ in range(n_vars):
Sc = self._search_candidate(S)
if len(Sc) == 1:
sink = Sc[0]
else:
scores = [self._compute_sink_score(X_list, j, S) for j in Sc]
sink = Sc[int(np.argmax(scores))]
causal_order.insert(0, sink)
S.remove(sink)
if (
self._Aknw is not None
and self._partial_orders is not None
and len(self._partial_orders) > 0
):
self._partial_orders = self._partial_orders[
self._partial_orders[:, 1] != sink
]
return causal_order
def _compute_sink_score(self, X_list, j, S):
"""Compute the sink score (Fisher-combined p-value) for variable j."""
T = len(X_list)
p_values = []
for t in range(1, T):
n_samples = X_list[t].shape[0]
parts = []
same_time_cols = [i for i in S if i != j]
if same_time_cols:
parts.append(X_list[t][:, same_time_cols])
for k in range(1, min(t, self._n_lags) + 1):
parts.append(X_list[t - k])
Z = np.hstack(parts) if parts else np.empty((n_samples, 0))
residual = self._compute_residual(X_list[t][:, j], Z)
_, p_t = hsic_test_gamma(residual, Z)
p_values.append(p_t)
return self._fisher_combine_pvalues(p_values)
# ------------------------------------------------------------------
# Phase 2: Instantaneous causal graph estimation
# ------------------------------------------------------------------
def _estimate_instantaneous_graph(self, X_list, causal_order):
"""Dispatch Phase 2 processing based on is_common_graph."""
if self._is_common_graph:
return self._estimate_common_instantaneous_graph(X_list, causal_order)
return self._estimate_separate_instantaneous_graph(X_list, causal_order)
def _estimate_common_instantaneous_graph(self, X_list, causal_order):
"""Estimate the instantaneous causal graph B shared across all time points (Phase 2, is_common_graph=True)."""
n_vars = X_list[0].shape[1]
B = np.zeros((n_vars, n_vars))
for j_idx, j in enumerate(causal_order):
if j_idx == 0:
continue
parents = self._apply_prior_knowledge_to_parents(j, causal_order[:j_idx])
for i in list(parents):
p_combined = self._test_edge_combined(X_list, j, i, parents)
if p_combined < self._alpha:
B[j, i] = 1
else:
parents.remove(i)
return [B]
def _estimate_separate_instantaneous_graph(self, X_list, causal_order):
"""Estimate independent instantaneous causal graphs per time point (Phase 2, is_common_graph=False)."""
T = len(X_list)
n_vars = X_list[0].shape[1]
B_list = [None] * T
for t in range(self._n_lags):
B_list[t] = np.full((n_vars, n_vars), np.nan)
for t in range(self._n_lags, T):
B_list[t] = np.zeros((n_vars, n_vars))
for j_idx, j in enumerate(causal_order):
if j_idx == 0:
continue
parents_t = self._apply_prior_knowledge_to_parents(
j, causal_order[:j_idx]
)
for i in list(parents_t):
p = self._test_edge_single(X_list, t, j, i, parents_t)
if p < self._alpha:
B_list[t][j, i] = 1
else:
parents_t.remove(i)
return B_list
def _test_edge_combined(self, X_list, j, i, parents):
"""Return the Fisher-combined p-value across all time points to determine whether to remove edge i->j."""
T = len(X_list)
p_values = []
for t in range(1, T):
n_samples = X_list[t].shape[0]
parts_Z = []
same_time_cols = [p for p in parents if p != i]
if same_time_cols:
parts_Z.append(X_list[t][:, same_time_cols])
for k in range(1, min(t, self._n_lags) + 1):
parts_Z.append(X_list[t - k])
Z = np.hstack(parts_Z) if parts_Z else np.empty((n_samples, 0))
X_parents = X_list[t][:, parents]
residual = self._compute_residual(X_list[t][:, j], Z)
_, p_t = hsic_test_gamma(residual, X_parents)
p_values.append(p_t)
return self._fisher_combine_pvalues(p_values)
def _test_edge_single(self, X_list, t, j, i, parents):
"""Return the p-value at time point t alone to determine whether to remove edge i->j."""
n_samples = X_list[t].shape[0]
parts_Z = []
same_time_cols = [p for p in parents if p != i]
if same_time_cols:
parts_Z.append(X_list[t][:, same_time_cols])
for k in range(1, min(t, self._n_lags) + 1):
parts_Z.append(X_list[t - k])
Z = np.hstack(parts_Z) if parts_Z else np.empty((n_samples, 0))
X_parents = X_list[t][:, parents]
residual = self._compute_residual(X_list[t][:, j], Z)
_, p = hsic_test_gamma(residual, X_parents)
return p
# ------------------------------------------------------------------
# Phase 3: Lag causal graph estimation
# ------------------------------------------------------------------
def _estimate_lag_graph(self, X_list, causal_order, B_list):
"""Dispatch Phase 3 processing based on is_common_graph."""
if self._is_common_graph:
return self._estimate_common_lag_graph(X_list, causal_order, B_list[0])
return self._estimate_separate_lag_graph(X_list, causal_order, B_list)
def _estimate_common_lag_graph(self, X_list, causal_order, B):
"""Estimate the lag causal graph A shared across all time points (Phase 3, is_common_graph=True)."""
n_vars = X_list[0].shape[1]
A = np.zeros((self._n_lags, n_vars, n_vars))
for j in causal_order:
confirmed = np.where(B[j, :] == 1)[0].tolist()
lag_parents = [
(k, i) for k in range(1, self._n_lags + 1) for i in range(n_vars)
]
lag_parents = self._apply_prior_knowledge_to_lag_parents(j, lag_parents)
for k_test, i_test in list(lag_parents):
p_combined = self._test_lag_edge_combined(
X_list, j, k_test, i_test, lag_parents, confirmed
)
if p_combined < self._alpha:
A[k_test - 1, j, i_test] = 1 # k_test: 1-origin -> 0-origin
else:
lag_parents.remove((k_test, i_test))
return [A]
def _estimate_separate_lag_graph(self, X_list, causal_order, B_list):
"""Estimate independent lag causal graphs per time point (Phase 3, is_common_graph=False)."""
T = len(X_list)
n_vars = X_list[0].shape[1]
A_list = [None] * T
for t in range(self._n_lags):
A_list[t] = np.full((self._n_lags, n_vars, n_vars), np.nan)
for t in range(self._n_lags, T):
A_list[t] = np.zeros((self._n_lags, n_vars, n_vars))
for j in causal_order:
confirmed = np.where(B_list[t][j, :] == 1)[0].tolist()
lag_parents = [
(k, i) for k in range(1, self._n_lags + 1) for i in range(n_vars)
]
lag_parents = self._apply_prior_knowledge_to_lag_parents(j, lag_parents)
for k_test, i_test in list(lag_parents):
p = self._test_lag_edge_single(
X_list, t, j, k_test, i_test, lag_parents, confirmed
)
if p < self._alpha:
A_list[t][k_test - 1, j, i_test] = 1
else:
lag_parents.remove((k_test, i_test))
return A_list
def _test_lag_edge_combined(
self, X_list, j, k_test, i_test, lag_parents, confirmed_parents
):
"""Return the Fisher-combined p-value across all time points for lag edge (k_test, i_test)->j."""
T = len(X_list)
p_values = []
for t in range(1, T):
if t < k_test:
continue
n_samples = X_list[t].shape[0]
parts_Z = []
if confirmed_parents:
parts_Z.append(X_list[t][:, confirmed_parents])
for k, i in lag_parents:
if k <= t and (k, i) != (k_test, i_test):
parts_Z.append(X_list[t - k][:, i].reshape(-1, 1))
Z = np.hstack(parts_Z) if parts_Z else np.empty((n_samples, 0))
lag_cols = []
for k, i in lag_parents:
if k <= t:
lag_cols.append(X_list[t - k][:, i].reshape(-1, 1))
X_lag_all = np.hstack(lag_cols) if lag_cols else np.empty((n_samples, 0))
if X_lag_all.shape[1] == 0:
continue
residual = self._compute_residual(X_list[t][:, j], Z)
_, p = hsic_test_gamma(residual, X_lag_all)
p_values.append(p)
if not p_values:
return 1.0 # No valid time points -> treat as no edge (independent)
return self._fisher_combine_pvalues(p_values)
def _test_lag_edge_single(
self, X_list, t, j, k_test, i_test, lag_parents, confirmed_parents
):
"""Return the p-value at time point t alone for lag edge (k_test, i_test)->j."""
n_samples = X_list[t].shape[0]
parts_Z = []
if confirmed_parents:
parts_Z.append(X_list[t][:, confirmed_parents])
for k, i in lag_parents:
if k <= t and (k, i) != (k_test, i_test):
parts_Z.append(X_list[t - k][:, i].reshape(-1, 1))
Z = np.hstack(parts_Z) if parts_Z else np.empty((n_samples, 0))
lag_cols = []
for k, i in lag_parents:
if k <= t:
lag_cols.append(X_list[t - k][:, i].reshape(-1, 1))
X_lag_all = np.hstack(lag_cols) if lag_cols else np.empty((n_samples, 0))
residual = self._compute_residual(X_list[t][:, j], Z)
_, p = hsic_test_gamma(residual, X_lag_all)
return p
class LongitudinalRESITBootstrapResult(object):
"""The result of bootstrapping for LongitudinalLiNGAM."""
def __init__(
self,
n_timepoints,
causal_orders,
B_lists,
A_lists,
adjacency_matrices_list,
is_common_graph,
):
"""Construct a BootstrapResult.
Parameters
----------
n_timepoints : int
The number of time points.
causal_orders : array-like, shape (n_sampling)
The causal order list by bootstrapping.
B_lists : array-like, shape (n_sampling)
The instantaneous causal graph list by bootstrapping.
A_lists : array-like, shape (n_sampling)
The lag causal graph list by bootstrapping.
adjacency_matrices_list : array-like, shape (n_sampling)
The combined adjacency matrices list by bootstrapping.
is_common_graph : bool
Whether the estimated graph is common across all time points or separate for each time point.
"""
self._n_timepoints = n_timepoints
self._causal_orders = causal_orders
self._B_lists = B_lists
self._A_lists = A_lists
self._adjacency_matrices_list = adjacency_matrices_list
self._is_common_graph = is_common_graph
@property
def causal_orders_(self):
"""Instantaneous causal order common to all time points.
Returns
-------
causal_orders_ : list of list of int
The causal orders of the fitted model for each bootstrap sample, where the first element is the most
upstream variable and the last is the sink.
"""
return self._causal_orders
@property
def B_lists_(self):
"""Estimated instantaneous causal graphs.
Returns
-------
B_lists_ : list of ndarray
The list of estimated instantaneous causal graphs for each bootstrap sample.
"""
return self._B_lists
@property
def A_lists_(self):
"""Estimated lag causal graphs.
Returns
-------
A_lists_ : list of ndarray
The list of estimated lag causal graphs for each bootstrap sample.
"""
return self._A_lists
def get_causal_direction_counts(
self,
n_directions=None,
):
"""Get causal direction count as a result of bootstrapping.
Parameters
----------
n_directions : int, optional (default=None)
If int, then The top ``n_directions`` items are included in the result
Returns
-------
causal_direction_counts : dict
List of causal directions sorted by count in descending order.
The dictionary has the following format::
{'from': [n_directions], 'to': [n_directions], 'count': [n_directions]}
where ``n_directions`` is the number of causal directions.
"""
# Check parameters
if isinstance(n_directions, (numbers.Integral, np.integer)):
if not 0 < n_directions:
raise ValueError("n_directions must be an integer greater than 0")
elif n_directions is None:
pass
else:
raise ValueError("n_directions must be an integer greater than 0")
if self._is_common_graph:
T = 1
else:
T = self._n_timepoints
# Count causal directions
cdc_list = []
for t in range(T):
directions = []
for am in self._adjacency_matrices_list:
direction = np.array(np.where(np.abs(am[t]) > 0.0))
directions.append(direction.T)
directions = np.concatenate(directions)
if len(directions) == 0:
cdc = {"from": [], "to": [], "count": []}
cdc_list.append(cdc)
continue
directions, counts = np.unique(directions, axis=0, return_counts=True)
sort_order = np.argsort(-counts)
sort_order = (
sort_order[:n_directions] if n_directions is not None else sort_order
)
counts = counts[sort_order]
directions = directions[sort_order]
cdc = {
"from": directions[:, 1].tolist(),
"to": directions[:, 0].tolist(),
"count": counts.tolist(),
}
cdc_list.append(cdc)
return cdc_list
def get_directed_acyclic_graph_counts(self, n_dags=None):
"""Get DAGs count as a result of bootstrapping.
Parameters
----------
n_dags : int, optional (default=None)
If int, then The top ``n_dags`` items are included in the result
Returns
-------
directed_acyclic_graph_counts : dict
List of directed acyclic graphs sorted by count in descending order.
The dictionary has the following format::
{'dag': [n_dags], 'count': [n_dags]}.
where ``n_dags`` is the number of directed acyclic graphs.
"""
# Check parameters
if isinstance(n_dags, (numbers.Integral, np.integer)):
if not 0 < n_dags:
raise ValueError("n_dags must be an integer greater than 0")
elif n_dags is None:
pass
else:
raise ValueError("n_dags must be an integer greater than 0")
if self._is_common_graph:
T = 1
else:
T = self._n_timepoints
# Count directed acyclic graphs
dagc_list = []
for t in range(T):
dags = []
for am in self._adjacency_matrices_list:
dag = np.abs(am[t]) > 0.0
dags.append(dag)
dags, counts = np.unique(dags, axis=0, return_counts=True)
sort_order = np.argsort(-counts)
sort_order = sort_order[:n_dags] if n_dags is not None else sort_order
counts = counts[sort_order]
dags = dags[sort_order]
dags = [
{"from": np.where(dag)[1].tolist(), "to": np.where(dag)[0].tolist()}
for dag in dags
]
dagc_list.append({"dag": dags, "count": counts.tolist()})
return dagc_list
def get_probabilities(self):
"""Get bootstrap probability.
Returns
-------
probabilities : array-like
List of bootstrap probability matrix.
"""
if self._is_common_graph:
T = 1
else:
T = self._n_timepoints
# Count directed acyclic graphs
probs = np.zeros(self._adjacency_matrices_list[0].shape)
for t in range(T):
for am in self._adjacency_matrices_list:
probs[t] += np.where(np.abs(am[t]) > 0.0, 1.0, 0.0)
probs[t] = probs[t] / len(self._adjacency_matrices_list)
return probs
def get_paths(self, from_index, to_index, from_t, to_t):
"""Get all paths from the start variable to the end variable and their bootstrap probabilities.
Parameters
----------
from_index : int
Index of the variable at the start of the path.
to_index : int
Index of the variable at the end of the path.
from_t : int
The starting timepoint of the path.
to_t : int
The end timepoint of the path.
Returns
-------
paths : dict
List of path and bootstrap probability.
The dictionary has the following format::
{'path': [n_paths], 'effect': [n_paths], 'probability': [n_paths]}
where ``n_paths`` is the number of paths.
"""
# check parameters
if to_t < from_t:
raise ValueError("to_t should be greater than or equal to from_t.")
if to_t == from_t:
if to_index == from_index:
raise ValueError("The same variable is specified for from and to.")
# Find all paths from from_index to to_index
paths_list = []
effects_list = []
for am in self._adjacency_matrices_list:
# am shape: (T, n_vars, n_vars * (n_lags + 1))
n_timepoints = to_t - from_t + 1
n_vars = am.shape[1]
n_lags = am.shape[2] // n_vars - 1
expansion_m_size = n_vars * n_timepoints
expansion_m = np.zeros((expansion_m_size, expansion_m_size))
# Block(i, j) represents edges from time (from_t + j) to (from_t + i)
for i in range(n_timepoints):
t = from_t + i
row = n_vars * i
for j in range(i + 1):
col = n_vars * j
tau = i - j
if tau > n_lags:
continue
c0 = tau * n_vars
c1 = (tau + 1) * n_vars
expansion_m[row : row + n_vars, col : col + n_vars] = am[
t, :, c0:c1
]
# row = n_features * i
# col = n_features * j
# t = i + from_t
# tau = i - j
# if col > row:
# continue
# if tau > am.shape[1] - 1:
# continue
# expansion_m[row : row + n_features, col : col + n_features] = am[t, tau]
paths, effects = find_all_paths(
expansion_m,
int(from_index),
int(n_vars * (to_t - from_t) + to_index),
0.0,
)
# Convert path to string to make them easier to handle.
paths_list.extend(["_".join(map(str, p)) for p in paths])
effects_list.extend(effects)
paths_list = np.array(paths_list)
effects_list = np.array(effects_list)
# Count paths
paths_str, counts = np.unique(paths_list, axis=0, return_counts=True)
# Sort by count
order = np.argsort(-counts)
probs = counts[order] / len(self._adjacency_matrices_list)
paths_str = paths_str[order]
# Calculate median of causal effect for each path
effects = [
np.median(effects_list[np.where(paths_list == p)]) for p in paths_str
]
result = {
"path": [[int(i) for i in p.split("_")] for p in paths_str],
"effect": effects,
"probability": probs.tolist(),
}
return result