Source code for lingam.utils._common_edge

import numpy as np
import graphviz
import networkx as nx
from matplotlib.colors import is_color_like
from sklearn.utils import check_array, check_scalar


[docs] def get_common_edge_probabilities(bootstrap_results, mode='across'): """ Calculate the probability of common edges across multiple bootstrap results. This function analyzes adjacency matrices from a list of bootstrap results and computes the probability of common edges (non-zero and not NaN) and hidden common cause edges (NaN) either per sample or across all samples depending on the selected mode. Parameters ---------- bootstrap_results : list A list of objects, each containing an attribute `adjacency_matrices_`, which is a list or array of adjacency matrices (2D numpy arrays) from bootstrap iterations. mode : str, optional The mode of calculation. Must be either: * 'per': Computes common edge probabilities by extracting common edges at each bootstrap iteration. * 'across': Computes common edge probabilities across all samples within each bootstrap result. Returns ------- common_edge_probabilities : np.ndarray A stacked array of shape (2, n, n), where: * common_edge_probabilities[0]: The probabilities of common edges. * common_edge_probabilities[1]: The nan probabilities of hidden common cause. """ # check parameters if not isinstance(bootstrap_results, list): raise ValueError("bootstrap_results must be a list.") for bootstrap_result in bootstrap_results: if not hasattr(bootstrap_result, "adjacency_matrices_"): raise AttributeError("The bootstrap_result must implement a callable 'adjacency_matrices_'.") if mode == 'across': common_edge_prob_list = [] hidden_common_edge_prob_list = [] for bootstrap_result in bootstrap_results: matrices = np.array(bootstrap_result.adjacency_matrices_) # common edge: non-zero and not NaN valid_mask = ~np.isnan(matrices) nonzero_mask = (matrices != 0) & valid_mask common_edge_prob = nonzero_mask.sum(axis=0) / len(matrices) common_edge_prob_list.append(common_edge_prob) # hidden common cause edges: NaN unobserved_prob = np.isnan(matrices).sum(axis=0) / len(matrices) hidden_common_edge_prob_list.append(unobserved_prob) # Combine probabilities common_edge_prob = np.prod(common_edge_prob_list, axis=0) hidden_common_edge_prob = np.prod(hidden_common_edge_prob_list, axis=0) # Stack into a single array of shape (2, n, n) common_edge_probabilities = np.stack((common_edge_prob, hidden_common_edge_prob), axis=0) elif mode == 'per': n_samples = len(bootstrap_results[0].adjacency_matrices_) n_vars = bootstrap_results[0].adjacency_matrices_[0].shape[0] common_edge_counts = np.zeros((n_vars, n_vars)) hidden_common_edge_counts = np.zeros((n_vars, n_vars)) # Extract common edges from each bootstrap iteration for i in range(n_samples): nonzero_masks = [] nan_masks = [] for bootstrap_result in bootstrap_results: adj_matrix = bootstrap_result.adjacency_matrices_[i] # common edge: non-zero and not NaN valid_mask = ~np.isnan(adj_matrix) nonzero_mask = (adj_matrix != 0) & valid_mask nonzero_masks.append(nonzero_mask) # hidden common cause edges: NaN nan_mask = np.isnan(adj_matrix) nan_masks.append(nan_mask) common_nonzero_mask = np.logical_and.reduce(nonzero_masks) common_nan_mask = np.logical_and.reduce(nan_masks) common_edge_counts += common_nonzero_mask.astype(int) hidden_common_edge_counts += common_nan_mask.astype(int) # Combine probabilities common_edge_prob = common_edge_counts / n_samples hidden_common_edge_prob = hidden_common_edge_counts / n_samples # Stack into a single array of shape (2, n, n) common_edge_probabilities = np.stack((common_edge_prob, hidden_common_edge_prob), axis=0) else: raise ValueError("Invalid mode. Choose 'per' or 'across'.") return common_edge_probabilities
[docs] def make_dot_for_nan_probability_matrix( adjacency_matrix, labels=None, lower_limit=0.01, path=None, path_color=None, ignore_shape=False, ): """Directed graph source code in the DOT language with specified nan probability matrix of hidden common cause. Parameters ---------- adjacency_matrix : array-like with shape (n_features, n_features) Nan probability matrix of hidden common cause to make graph, where ``n_features`` is the number of features. labels : array-like, optional (default=None) Label to use for graph features. lower_limit : float, optional (default=0.01) Threshold for drawing direction. If float, then directions with absolute values of coefficients less than ``lower_limit`` are excluded. path : tuple, optional (default=None) Path to highlight. Tuple of start index and end index. path_color : string, optional (default=None) Colors to highlight a path. ignore_shape : boolean, optional (default=False) Ignore checking the shape of adjaceny_matrix or not. Returns ------- graph : graphviz.Digraph Directed graph source code in the DOT language. If order is unknown, draw a double-headed arrow. """ # Check parameters B = check_array(np.nan_to_num(adjacency_matrix)) if not ignore_shape and B.shape[0] != B.shape[1]: raise ValueError("'adjacency_matrix' is not square matrix.") if labels is not None: if B.shape[1] != len(labels): raise ValueError( "Length of 'labels' does not match length of 'adjacency_matrix'" ) if path is not None: if not isinstance(path, tuple) or len(path) < 2: raise TypeError("'path' should be a tuple of node indices.") if path_color is not None and not is_color_like(path_color): raise ValueError("'path_color' should be an color name.") if path_color is None: path_color = "black" path_start = check_scalar( path[0], "path_start", int, min_val=0, max_val=B.shape[0] ) path_end = check_scalar(path[1], "path_end", int, min_val=0, max_val=B.shape[0]) idx = np.abs(B) > lower_limit nx_graph = None if path is not None: nx_graph = nx.from_numpy_array(idx.astype(int).T, create_using=nx.DiGraph) # search path path_edges = None path_nodes = None if path is not None: path_edges = set() for path_ in nx.all_simple_paths(nx_graph, path[0], path[1]): path_edges |= set(zip(path_, path_[1:])) path_nodes = np.unique([list(edge) for edge in path_edges]) if labels is None: path_nodes = [f"x{node}" for node in path_nodes] else: path_nodes = [labels[node] for node in path_nodes] d = graphviz.Digraph(engine="dot") # nodes names = labels if labels else [f"x{i}" for i in range(len(B))] for name in names: kwargs = {} if path is not None: if name in path_nodes: kwargs["fontcolor"] = path_color kwargs["color"] = path_color if name == names[path_start] or name == names[path_end]: kwargs["peripheries"] = "2" d.node(name, **kwargs) # edges # This matrix is a probability matrix of nan, and the diagonal components contain the same value dirs = np.where(np.tril(idx)) for to, from_, coef in zip(dirs[0], dirs[1], B[dirs]): kwargs = {} if path is not None and (from_, to) in path_edges: kwargs["penwidth"] = "2" kwargs["fontcolor"] = path_color kwargs["color"] = path_color if from_ != to: if idx[to, from_] and B[from_, to] == B[to, from_]: d.edge(names[from_], names[to], dir="both", label=f"{coef:.2f}", **kwargs) return d