Source code for lingam.utils

"""
Python implementation of the LiNGAM algorithms.
The LiNGAM Project: https://sites.google.com/view/sshimizu06/lingam
"""

import numbers
import graphviz
import matplotlib.pyplot as plt
import networkx as nx
import numpy as np
import pandas as pd
import semopy
import subprocess

from matplotlib import colors as mcolors
from matplotlib.colors import is_color_like
from sklearn import linear_model
from sklearn.linear_model import LassoLarsIC, LinearRegression
from sklearn.preprocessing import StandardScaler
from sklearn.utils import check_array, check_scalar

from ._rcd import extract_ancestors
from ._f_correlation import f_correlation
from ._visualize_nonlinear_causal_effect import visualize_nonlinear_causal_effect

__all__ = [
    "print_causal_directions",
    "print_dagc",
    "make_prior_knowledge",
    "remove_effect",
    "make_dot",
    "make_dot_highlight",
    "predict_adaptive_lasso",
    "get_sink_variables",
    "get_exo_variables",
    "find_all_paths",
    "likelihood_i",
    "log_p_super_gaussian",
    "variance_i",
    "extract_ancestors",
    "f_correlation",
    "visualize_nonlinear_causal_effect",
    "evaluate_model_fit",
    "calculate_distance_from_root_nodes",
    "calculate_total_effect",
]










[docs] def make_prior_knowledge( n_variables, exogenous_variables=None, sink_variables=None, paths=None, no_paths=None, ): """Make matrix of prior knowledge. Parameters ---------- n_variables : int Number of variables. exogenous_variables : array-like, shape (index, ...), optional (default=None) List of exogenous variables(index). Prior knowledge is created with the specified variables as exogenous variables. sink_variables : array-like, shape (index, ...), optional (default=None) List of sink variables(index). Prior knowledge is created with the specified variables as sink variables. paths : array-like, shape ((index, index), ...), optional (default=None) List of variables(index) pairs with directed path. If ``(i, j)``, prior knowledge is created that xi has a directed path to xj. no_paths : array-like, shape ((index, index), ...), optional (default=None) List of variables(index) pairs without directed path. If ``(i, j)``, prior knowledge is created that xi does not have a directed path to xj. Returns ------- prior_knowledge : array-like, shape (n_variables, n_variables) Return matrix of prior knowledge used for causal discovery. """ prior_knowledge = np.full((n_variables, n_variables), -1) if no_paths: for no_path in no_paths: prior_knowledge[no_path[1], no_path[0]] = 0 if paths: for path in paths: prior_knowledge[path[1], path[0]] = 1 if sink_variables: for var in sink_variables: prior_knowledge[:, var] = 0 if exogenous_variables: for var in exogenous_variables: prior_knowledge[var, :] = 0 np.fill_diagonal(prior_knowledge, -1) return prior_knowledge
[docs] def get_sink_variables(adjacency_matrix): """The sink variables(index) in the adjacency matrix. Parameters ---------- adjacency_matrix : array-like, shape (n_variables, n_variables) Adjacency matrix, where n_variables is the number of variables. Returns ------- sink_variables : array-like List of sink variables(index). """ am = adjacency_matrix.copy() am = np.abs(am) np.fill_diagonal(am, 0) sink_vars = [i for i in range(am.shape[1]) if am[:, i].sum() == 0] return sink_vars
[docs] def get_exo_variables(adjacency_matrix): """The exogenous variables(index) in the adjacency matrix. Parameters ---------- adjacency_matrix : array-like, shape (n_variables, n_variables) Adjacency matrix, where n_variables is the number of variables. Returns ------- exogenous_variables : array-like List of exogenous variables(index). """ am = adjacency_matrix.copy() am = np.abs(am) np.fill_diagonal(am, 0) exo_vars = [i for i in range(am.shape[1]) if am[i, :].sum() == 0] return exo_vars
[docs] def remove_effect(X, remove_features): """Create a dataset that removes the effects of features by linear regression. Parameters ---------- X : array-like, shape (n_samples, n_features) Data, where ``n_samples`` is the number of samples and ``n_features`` is the number of features. remove_features : array-like List of features(index) to remove effects. Returns ------- X : array-like, shape (n_samples, n_features) Data after removing effects of ``remove_features``. """ X = np.copy(check_array(X)) features_ = [i for i in np.arange(X.shape[1]) if i not in remove_features] for feature in features_: reg = linear_model.LinearRegression() reg.fit(X[:, remove_features], X[:, feature]) X[:, feature] = X[:, feature] - reg.predict(X[:, remove_features]) return X
[docs] def make_dot( adjacency_matrix, labels=None, lower_limit=0.01, prediction_feature_indices=None, prediction_target_label="Y(pred)", prediction_line_color="red", prediction_coefs=None, prediction_feature_importance=None, path=None, path_color=None, detect_cycle=False, ignore_shape=False, ): """Directed graph source code in the DOT language with specified adjacency matrix. Parameters ---------- adjacency_matrix : array-like with shape (n_features, n_features) Adjacency matrix to make graph, where ``n_features`` is the number of features. labels : array-like, optional (default=None) Label to use for graph features. lower_limit : float, optional (default=0.01) Threshold for drawing direction. If float, then directions with absolute values of coefficients less than ``lower_limit`` are excluded. prediction_feature_indices : array-like, optional (default=None) Indices to use as prediction features. prediction_target_label : string, optional (default='Y(pred)')) Label to use for target variable of prediction. prediction_line_color : string, optional (default='red') Line color to use for prediction's graph. prediction_coefs : array-like, optional (default=None) Coefficients to use for prediction's graph. prediction_feature_importance : array-like, optional (default=None) Feature importance to use for prediction's graph. path : tuple, optional (default=None) Path to highlight. Tuple of start index and end index. path_color : string, optional (default=None) Colors to highlight a path. detect_cycle : boolean, optional (default=False) Highlight simple cycles. ignore_shape : boolean, optional (default=False) Ignore checking the shape of adjaceny_matrix or not. Returns ------- graph : graphviz.Digraph Directed graph source code in the DOT language. If order is unknown, draw a double-headed arrow. """ # Check parameters B = check_array(np.nan_to_num(adjacency_matrix)) if not ignore_shape and B.shape[0] != B.shape[1]: raise ValueError("'adjacency_matrix' is not square matrix.") if labels is not None: if B.shape[1] != len(labels): raise ValueError( "Length of 'labels' does not match length of 'adjacency_matrix'" ) if prediction_feature_indices is not None: if prediction_coefs is not None and ( len(prediction_feature_indices) != len(prediction_coefs) ): raise ValueError( "Length of 'prediction_coefs' does not match length of 'prediction_feature_indices'" ) if prediction_feature_importance is not None and ( len(prediction_feature_indices) != len(prediction_feature_importance) ): raise ValueError( "Length of 'prediction_feature_importance' does not match length of 'prediction_feature_indices'" ) if path is not None: if not isinstance(path, tuple) or len(path) < 2: raise TypeError("'path' should be a tuple of node indices.") if path_color is not None and not is_color_like(path_color): raise ValueError("'path_color' should be an color name.") if path_color is None: path_color = "black" path_start = check_scalar( path[0], "path_start", int, min_val=0, max_val=B.shape[0] ) path_end = check_scalar(path[1], "path_end", int, min_val=0, max_val=B.shape[0]) if detect_cycle is not None: if not isinstance(detect_cycle, bool): raise TypeError("'detect_cycle' should be boolean.") idx = np.abs(B) > lower_limit nx_graph = None if path is not None or detect_cycle is True: nx_graph = nx.from_numpy_array(idx.astype(int).T, create_using=nx.DiGraph) # search path path_edges = None path_nodes = None if path is not None: path_edges = set() for path_ in nx.all_simple_paths(nx_graph, path[0], path[1]): path_edges |= set(zip(path_, path_[1:])) path_nodes = np.unique([list(edge) for edge in path_edges]) if labels is None: path_nodes = [f"x{node}" for node in path_nodes] else: path_nodes = [labels[node] for node in path_nodes] # simple cycles cycle_edges = [] if detect_cycle is True: for cycle in nx.simple_cycles(nx_graph): es = list(zip(cycle, np.roll(cycle, -1))) cycle_edges.append(es) cycle_edges = sum(cycle_edges, start=[]) d = graphviz.Digraph(engine="dot") # nodes names = labels if labels else [f"x{i}" for i in range(len(B))] for name in names: kwargs = {} if path is not None: if name in path_nodes: kwargs["fontcolor"] = path_color kwargs["color"] = path_color if name == names[path_start] or name == names[path_end]: kwargs["peripheries"] = "2" d.node(name, **kwargs) # edges dirs = np.where(idx) for to, from_, coef in zip(dirs[0], dirs[1], B[idx]): kwargs = {} if path is not None and (from_, to) in path_edges: kwargs["penwidth"] = "2" kwargs["fontcolor"] = path_color kwargs["color"] = path_color if detect_cycle is True and (from_, to) in cycle_edges: kwargs["style"] = "dashed" d.edge(names[from_], names[to], label=f"{coef:.2f}", **kwargs) # integrate of prediction model if prediction_feature_indices is not None: d.node( prediction_target_label, color=prediction_line_color, fontcolor=prediction_line_color, ) if prediction_coefs is not None: for from_, coef in zip(prediction_feature_indices, prediction_coefs): if np.abs(coef) > lower_limit: d.edge( names[from_], prediction_target_label, label=f"{coef:.2f}", color=prediction_line_color, fontcolor=prediction_line_color, style="dashed", ) elif prediction_feature_importance is not None: for from_, imp in zip( prediction_feature_indices, prediction_feature_importance ): d.edge( names[from_], prediction_target_label, label=f"({imp})", color=prediction_line_color, fontcolor=prediction_line_color, style="dashed", ) else: for from_ in prediction_feature_indices: d.edge( names[from_], prediction_target_label, color=prediction_line_color, style="dashed", ) # If the value is nan, draw a double-headed arrow unk_order = np.where(np.isnan(np.tril(adjacency_matrix))) unk_order_set = set([val for item in unk_order for val in item]) with d.subgraph() as s: s.attr(rank="same") for node in unk_order_set: s.node(names[node]) for to, from_ in zip(unk_order[0], unk_order[1]): d.edge(names[from_], names[to], dir="both") return d
def make_dot_highlight( adjacency_matrix, node_index, labels=None, max_dsc=None, max_anc=None, lower_limit=None, cmap=None, draw_others=True, detect_cycle=False, path=None, path_color=None, vmargin=2, hmargin=2, ): """Hierachical make_dot. Parameters ---------- adjacency_matrix : array-like with shape (n_features, n_features) Adjacency matrix to make graph, where ``n_features`` is the number of features. node_index : int Index of target node. labels : array-like, optional (default=None) Label to use for graph features. max_dsc : int, optional (default=None) Number of hierarchies on the descendant side to display. max_asc : int, optional (default=None) Number of hierarchies on the ancestral side to display. lower_limit : float, optional (default=0.01) Threshold for drawing direction. If float, then directions with absolute values of coefficients less than ``lower_limit`` are excluded. cmap : colormap, optional (default=None) Gradient color to represent hierarchy. draw_others : boolean, optional (default=True) Whether to draw others cluster. detect_cycle : boolean, optional (default=False) Highlight simple cycles. path : tuple, optional (default=None) Path to highlight. Tuple of start index and end index. path_color : string, optional (default=None) Colors to highlight a path. vmargin : float, optional (default=2) Vertical margin between nodes. hmargin : float, optional (default=2) Horizontal margin between nodes. Returns ------- graph : graphviz.Digraph Directed graph source code in the DOT language. If order is unknown, draw a double-headed arrow. """ # margins for clusters cls_margin_l = 1.5 cls_margin_r = 0.6 cls_margin_t = 0.6 cls_margin_b = 0.4 cls_space_target_other = 0.2 # check arguments adj = check_array( adjacency_matrix, ensure_2d=True, ensure_min_samples=2, ensure_min_features=2, copy=True, ) if adj.shape[0] != adj.shape[1]: raise ValueError node_index = check_scalar( node_index, "node_index", int, min_val=0, max_val=adj.shape[0], include_boundaries="left", ) if labels is None: node_names = [f"x{i}" for i in range(adj.shape[1])] else: node_names = check_array(labels, ensure_2d=False, dtype=str) if len(node_names) != adj.shape[1]: raise ValueError if max_dsc is not None: max_dsc = check_scalar(max_dsc, "max_dsc", int, min_val=0) if max_anc is not None: max_anc = check_scalar(max_anc, "max_anc", int, min_val=0) if lower_limit is not None: lower_limit = check_scalar(lower_limit, "lower_limit", (int, float)) draw_others = check_scalar(draw_others, "draw_others", bool) detect_cycle = check_scalar(detect_cycle, "detect_cycle", bool) path_start = None path_end = None if path is not None: if not isinstance(path, tuple) or len(path) < 2: raise TypeError("'path' should be a tuple of node indices.") if path_color is not None and not is_color_like(path_color): raise ValueError("'path_color' should be an color name.") if path_color is None: path_color = "black" path_start = check_scalar( path[0], "start", int, min_val=0, max_val=adj.shape[0] ) path_end = check_scalar(path[1], "end", int, min_val=0, max_val=adj.shape[0]) vmargin = check_scalar(vmargin, "vmargin", (float, int), min_val=1) hmargin = check_scalar(hmargin, "hmargin", (float, int), min_val=1) # apply lower_limit if lower_limit is not None: adj[abs(adj) < lower_limit] = 0 # analyze network G = nx.from_numpy_array(~np.isclose(adj.T, 0), create_using=nx.DiGraph) dsc_path_length = nx.single_source_shortest_path_length( G, node_index, cutoff=max_dsc ) del dsc_path_length[node_index] anc_path_length = nx.single_source_shortest_path_length( G.reverse(), node_index, cutoff=max_anc ) del anc_path_length[node_index] # node indices dsc_indices = set(dsc_path_length.keys()) anc_indices = set(anc_path_length.keys()) isolate_indices = set(nx.isolates(G)) other_indices = ( set(np.arange(adj.shape[0])) - dsc_indices - anc_indices - isolate_indices - set([node_index]) ) # clusters (distance -> list of nodes) clusters = {} clusters[0] = [node_index] clusters[None] = sorted(other_indices) + sorted(isolate_indices) for node in dsc_indices: d = dsc_path_length[node] if d not in clusters.keys(): clusters[d] = [] clusters[d].append(node) for node in anc_indices: d = -anc_path_length[node] if d not in clusters.keys(): clusters[d] = [] clusters[d].append(node) # search path path_edges = None path_nodes = None if path is not None: path_edges = set() for path_ in nx.all_simple_paths(G, path_start, path_end): path_edges |= set(zip(path_, path_[1:])) path_nodes = np.unique([list(edge) for edge in path_edges]) path_nodes = [node_names[node] for node in path_nodes] # create graphviz graph graph_attr = {"rankdir": "TB", "splines": "true"} graph = graphviz.Digraph("graph", engine="neato", graph_attr=graph_attr) # colors of clusters ds = sorted([d for d in clusters.keys() if d is not None]) if cmap is None: cluster_colors = {d: "black" for d in ds} else: cmap = plt.get_cmap(cmap) cs = np.linspace(0, 1, len(ds) + 1) cluster_colors = dict(zip(ds, cmap(cs))) cluster_colors[None] = cluster_colors[0] # distance -> y_position pos_y_map = {0: 0, None: 0} # distance from target to descendants -> pos_y dsc_dists = sorted([d for d in clusters.keys() if d is not None and 0 < d]) for i, dist in enumerate(dsc_dists): pos_y_map[dist] = -(i + 1) * vmargin # distance from target to ascendants -> pos_y anc_dists = sorted([d for d in clusters.keys() if d is not None and d < 0], key=abs) for i, dist in enumerate(anc_dists): pos_y_map[dist] = (i + 1) * vmargin get_cluster_name = lambda dist: f"cluster{dist}" get_node_id = lambda cl, node: f"{cls_name}-{node}" node_ids = {node: [] for node in np.arange(adj.shape[0])} # add subgraphs and nodes for dist, nodes in clusters.items(): # skip drawing the cluster if dist is None and draw_others is False: continue if len(nodes) == 0: continue # attributes for the cluster and nodes if dist == 0: cls_label = "target" offset_x = 0 elif dist is None: cls_label = "others" offset_x = cls_margin_r + cls_margin_l + cls_space_target_other nodes = sorted( nodes, key=lambda x: (x in isolate_indices, x in other_indices, x) ) else: cls_label = ( f"ancestor {abs(dist)}" if dist < 0 else f"descendant {abs(dist)}" ) offset_x = 0 nodes = sorted(nodes) pos_y = pos_y_map[dist] color = mcolors.to_hex(cluster_colors[dist], keep_alpha=True) cls_name = get_cluster_name(dist) graph_attr = { "style": "dashed", "label": cls_label, "labeljust": "l", "color": color, } with graph.subgraph(name=cls_name, graph_attr=graph_attr) as c: # add nodes for i, node in enumerate(nodes): node_id = get_node_id(cls_name, node) pos_x = hmargin * i + offset_x kwargs = {} if path is not None: if node in path_nodes: kwargs["fontcolor"] = path_color kwargs["color"] = path_color if node == path_start or node == path_end: kwargs["peripheries"] = "2" c.node( node_id, node_names[node], pos=f"{pos_x},{pos_y}!", color=color, **kwargs, ) node_ids[node].append((node_id, dist)) # dummy at the upper left x = -cls_margin_l + offset_x y = pos_y + cls_margin_t c.node( f"{cls_name}-dummy_upper", "", pos=f"{x},{y}!", style="invis", fixedsize="true", width="0", height="0", ) # dummy at the lower right x = (len(nodes) - 1) * hmargin + cls_margin_r + offset_x y = pos_y - cls_margin_b c.node( f"{cls_name}-dummy_lower", "", pos=f"{x},{y}!", style="invis", fixedsize="true", width="0", height="0", ) # cycles cycle_edges = [] if detect_cycle is True: for cycle in nx.simple_cycles(G): es = list(zip(cycle, np.roll(cycle, -1))) cycle_edges.append(es) cycle_edges = sum(cycle_edges, start=[]) # indices of clusters other_nodes = isolate_indices | other_indices valid_nodes = other_nodes | dsc_indices | anc_indices | set([node_index]) # add edges edges = np.argwhere(~np.isclose(adj, 0)) for h, t in edges: if draw_others is False and (t in other_nodes or h in other_nodes): continue if t not in valid_nodes or h not in valid_nodes: continue for tail_id, dist_t in node_ids[t]: for head_id, dist_h in node_ids[h]: color = cluster_colors[dist_h] color = mcolors.to_hex(color, keep_alpha=True) kwargs = {"color": color} if path is not None and (t, h) in path_edges: kwargs["penwidth"] = "2" kwargs["fontcolor"] = path_color kwargs["color"] = path_color if detect_cycle is True and (t, h) in cycle_edges: kwargs["style"] = "dashed" graph.edge(tail_id, head_id, label=f"{adj[h, t]:.2f}", **kwargs) return graph
[docs] def predict_adaptive_lasso(X, predictors, target, gamma=1.0): """Predict with Adaptive Lasso. Parameters ---------- X : array-like, shape (n_samples, n_features) Training data, where n_samples is the number of samples and n_features is the number of features. predictors : array-like, shape (n_predictors) Indices of predictor variable. target : int Index of target variable. Returns ------- coef : array-like, shape (n_features) Coefficients of predictor variable. """ # Standardize X scaler = StandardScaler() X_std = scaler.fit_transform(X) # Pruning with Adaptive Lasso lr = LinearRegression() lr.fit(X_std[:, predictors], X_std[:, target]) weight = np.power(np.abs(lr.coef_), gamma) reg = LassoLarsIC(criterion="bic") reg.fit(X_std[:, predictors] * weight, X_std[:, target]) pruned_idx = np.abs(reg.coef_ * weight) > 0.0 # Calculate coefficients of the original scale coef = np.zeros(reg.coef_.shape) if pruned_idx.sum() > 0: lr = LinearRegression() pred = np.array(predictors) lr.fit(X[:, pred[pruned_idx]], X[:, target]) coef[pruned_idx] = lr.coef_ return coef
[docs] def find_all_paths(dag, from_index, to_index, min_causal_effect=0.0): """Find all paths from point to point in DAG. Parameters ---------- dag : array-like, shape (n_features, n_features) The adjacency matrix to fine all paths, where n_features is the number of features. from_index : int Index of the variable at the start of the path. to_index : int Index of the variable at the end of the path. min_causal_effect : float, optional (default=0.0) Threshold for detecting causal direction. Causal directions with absolute values of causal effects less than ``min_causal_effect`` are excluded. Returns ------- paths : array-like, shape (n_paths) List of found path, where n_paths is the number of paths. effects : array-like, shape (n_paths) List of causal effect, where n_paths is the number of paths. """ # Extract all edges edges = np.array(np.where(np.abs(np.nan_to_num(dag)) > min_causal_effect)).T # Aggregate edges by start point to_indices = [] for i in range(dag.shape[0]): adj_list = edges[edges[:, 1] == i][:, 0].tolist() if len(adj_list) != 0: to_indices.append(adj_list) else: to_indices.append([]) # DFS paths = [] stack = [from_index] stack_to_indice = [to_indices[from_index]] while stack: if len(stack) > dag.shape[0]: raise ValueError( "Unable to find the path because a cyclic graph has been specified." ) cur_index = stack[-1] to_indice = stack_to_indice[-1] if cur_index == to_index: paths.append(stack.copy()) stack.pop() stack_to_indice.pop() else: if len(to_indice) > 0: next_index = to_indice.pop(0) stack.append(next_index) stack_to_indice.append(to_indices[next_index].copy()) else: stack.pop() stack_to_indice.pop() # Calculate the causal effect for each path effects = [] for p in paths: coefs = [dag[p[i + 1], p[i]] for i in range(len(p) - 1)] effects.append(np.cumprod(coefs)[-1]) return paths, effects
[docs] def likelihood_i(x, i, b_i, bi_0): """Compute local log-likelihood of component i. Parameters ---------- x : array-like, shape (n_features, n_samples) Data, where ``n_samples`` is the number of samples and ``n_features`` is the number of features. i : array-like Variable index. b_i : array-like The i^th column of adjacency matrix, B[i]. bi_0 : float Constant value for the i^th variable. Return ------- ll : float Local log-likelihood of component i. """ sample_size = x.shape[1] # number of data points var_i = variance_i(x, i, b_i) # ll = 0.0 ll += np.sum(log_p_super_gaussian((x[i] - np.dot(b_i, x) - bi_0) / np.sqrt(var_i))) ll -= sample_size * np.log(np.sqrt(var_i)) return ll
[docs] def log_p_super_gaussian(s): """Compute density function of the normalized independent components. Parameters ---------- s : array-like, shape (1, n_samples) Data, where ``n_samples`` is the number of samples. Return ------- x : float Density function of the normalized independent components, whose disturbances are super-Gaussian. """ const = -0.35 # normalising constant return -np.sqrt(2.0) * np.absolute(s) + const
[docs] def variance_i(X, i, b_i): """Compute empirical variance of component i. Parameters ---------- x : array-like, shape (n_features, n_samples) Data, where ``n_samples`` is the number of samples and ``n_features`` is the number of features. i : array-like Variable index. b_i : array-like The i^th column of adjacency matrix, B[i]. Return ------- variance : float Empirical variance of component i. """ # T = X.shape[1] # sample size estimated_disturbance = X[i] - np.dot(b_i, X) # variance = np.sum(estimated_disturbance ** 2) / T # JMLR paper assumes zero mean variance = np.var(estimated_disturbance) # stable version, even not zero mean. return variance
[docs] def evaluate_model_fit(adjacency_matrix, X, is_ordinal=None): """evaluate the given adjacency matrix and return fit indices Parameters ---------- adjacency_matrix : array-like, shape (n_features, n_features) Adjacency matrix representing a causal graph. The i-th column and row correspond to the i-th column of X. X : array-like, shape (n_samples, n_features) Training data. is_ordinal : array-like, shape (n_features,) Binary list. The i-th element represents that the i-th column of X is ordinal or not. 0 means not ordinal, otherwise ordinal. Return ------ fit_indices : pandas.DataFrame Fit indices. This API uses semopy's calc_stats(). See semopy's reference for details. """ # check inputs adj = check_array(adjacency_matrix, force_all_finite="allow-nan") if adj.ndim != 2 or (adj.shape[0] != adj.shape[1]): raise ValueError("adj must be an square matrix.") X = check_array(X) if X.shape[1] != adj.shape[1]: raise ValueError("X.shape[1] and adj.shape[1] must be the same.") if is_ordinal is None: is_ordinal = np.zeros(X.shape[1]) else: is_ordinal = check_array(is_ordinal, ensure_2d=False).flatten() if is_ordinal.shape[0] != adj.shape[1]: raise ValueError("is_ordinal.shape[0] and adj.shape[1] must be the same.") # build desc desc = "" eta_names = [] for i, row in enumerate(adj): # exogenous if np.sum(np.isnan(row)) == 0 and np.sum(np.isclose(row, 0)) == row.shape[0]: continue desc += f"x{i:d} ~ " for j, elem in enumerate(row): if np.isnan(elem): eta_name = f"eta_{i}_{j}" if i < j else f"eta_{j}_{i}" desc += f"{eta_name} + " if eta_name not in eta_names: eta_names.append(eta_name) elif not np.isclose(elem, 0): desc += f"{elem:f} * x{j:d} + " desc = desc[: -len(" * ")] + "\n" if len(eta_names) > 0: desc += "DEFINE(latent) " + " ".join(eta_names) + "\n" if sum(is_ordinal) > 0: indices = np.argwhere(is_ordinal).flatten() desc += "DEFINE(ordinal)" for i in indices: desc += f" x{i}" desc += "\n" columns = [f"x{i:d}" for i in range(X.shape[1])] X = pd.DataFrame(X, columns=columns) m = semopy.Model(desc) m.fit(X) stats = semopy.calc_stats(m) return stats
def calculate_distance_from_root_nodes(adjacency_matrix, max_distance=None): """Calculate shortest distances from root nodes. Parameters ---------- adjacency_matrices : array-like The adjacency matrix. max_distance : int or None, optional (default=None) The maximum distance to return nodes from root nodes. Returns ------- shortest_distances : dict The dictionary has the following format:: {'distance_from_root_node': [index_of_variables]} """ # check inputs adjacency_matrix = check_array( adjacency_matrix, ensure_min_samples=2, ensure_min_features=2, force_all_finite="allow-nan", ) if adjacency_matrix.shape[0] != adjacency_matrix.shape[1]: raise ValueError("adjacency_matrix must be an square matrix.") if max_distance is not None: max_distance = check_scalar(max_distance, "max_distance", int, min_val=1) # delete hidden common causes adjacency_matrix = np.nan_to_num(adjacency_matrix) # find root nodes root_indices = np.argwhere( np.isclose(np.sum(adjacency_matrix, axis=1), 0) == True ).flatten() if len(root_indices) == 0: raise ValueError("adjacency_matrix has no root nodes.") G = nx.from_numpy_array(adjacency_matrix.T, create_using=nx.DiGraph) # distances from root nodes dist_df = {} for index in root_indices: dist = nx.shortest_path_length(G, source=index) dist_df[index] = dist dist_df = pd.DataFrame(dist_df) dist_df = dist_df.fillna(np.iinfo(int).max) dist_df = dist_df.min(axis=1) if max_distance is not None: dist_df = dist_df.iloc[dist_df.values <= max_distance] dist_df = dist_df.sort_index() dist_df = dist_df.astype(int) result = {i: [] for i in pd.unique(dist_df)} for name, dist in dist_df.items(): result[dist].append(name) return result def calculate_total_effect(adjacency_matrix, from_index, to_index, is_continuous=None): """Calculate total effect. Parameters ---------- adjacency_matrix : array_like The adjacency matrix. from_index : int The index of the cause variable. to_index : int The index of the effect variable. is_continuous : list The list of boolean. is_continuous indicates whether each variable is continuous or discrete. Returns ------- total_effect : float """ # check inputs adjacency_matrix = check_array( adjacency_matrix, ensure_min_samples=2, ensure_min_features=2, force_all_finite="allow-nan", ) if adjacency_matrix.shape[0] != adjacency_matrix.shape[1]: raise ValueError( "adjacency_matrix must be an square matrix.", adjacency_matrix.shape ) from_index = check_scalar( from_index, "from_index", (numbers.Integral, np.integer), min_val=0, max_val=len(adjacency_matrix) - 1, ) to_index = check_scalar( to_index, "to_index", (numbers.Integral, np.integer), min_val=0, max_val=len(adjacency_matrix) - 1, ) if is_continuous is None: is_continuous = [True for _ in range(len(adjacency_matrix))] else: is_continuous = check_array( is_continuous, ensure_2d=False, ensure_min_samples=len(adjacency_matrix) ) # find all paths path_list, effects = find_all_paths(adjacency_matrix, from_index, to_index) # check all nodes on the path are continuous for path in path_list: for node in path[1:]: if not is_continuous[node]: raise ValueError("Variables on the path must be continuous variables.") total_effect = sum(effects) return total_effect def get_cuda_version(): try: nvcc_version = subprocess.check_output(["nvcc", "--version"]).decode('utf-8') print("CUDA Version found:\n", nvcc_version) return True except Exception as e: print("CUDA not found or nvcc not in PATH:", e) return False