Source code for braindynamics_starprotocol.sca.crosscorrelogram

import os
import numpy as np
from scipy.stats import pearsonr
try:
    from .sca_cy import cross_correlation
except ModuleNotFoundError:
    from sca.sca_cy import cross_correlation
import matplotlib.pyplot as plt
from scipy.interpolate import Akima1DInterpolator
from scipy.optimize import minimize_scalar

def _fisher_transform(r:float, norm_factor:float=1.12)->float:
    return 0.5*log((1 + r/norm_factor)/(1 - r/norm_factor))

def _inverse_fisher_transform(z:float, norm_factor:float=1.12)->float:
    return norm_factor*(exp(2*z) - 1)/(exp(2*z) + 1)

def _pearson_correlation(self, x:np.ndarray, y:np.ndarray)->float:
    return pearsonr(x, y)[0]

[docs] class CrossCorrelogram(): """ Cross-correlogram object, can be either classical Pearson, or SCA. Attributes: max_shift_size (int): Maximum shift (time lag) of time series for which correlation will be computed. scale_size (int, optional): Scale size parameter of SCA (length of the average short term window of correlation). Defaults to None. corr_coeff_arr (np.ndarray): Array of correlogram values (correlation coefficients) for each shift (time lag). _interpolator: Interpolator object used to smoothen correlogram. _maxabs_shift (int): Shift (time lag) value of the correlogram's MaxAbs (absolute peak). _maxabs_value (float): Correlogram value of MaxAbs (absolute peak). _maxabs_is_min (bool): Whether or not MaxAbs (absolute peak) is a minimum or maximum of the correlogram. """ def __init__(self, max_shift_size:int, scale_size:int=None): self.max_shift_size = max_shift_size # size of maximal shift of cross-correlation self.scale_size = scale_size # scale parameter of SCA (if None, classical CC will be computed) self.corr_coeff_arr = np.zeros(2*max_shift_size + 1, dtype=np.float32) # array storing values of the correlogram self._interpolator = None # interpolator object self._maxabs_shift, self._maxabs_value = -max_shift_size-1, -max_shift_size-1 # maxabs shift & value self._maxabs_is_min = None # whether or not maxabs is a minimum or a maximum of the correlogram
[docs] def clear(self)->None: """ Clear the correlogram object. """ self.corr_coeff_arr = np.zeros(2*self.max_shift_size + 1, dtype=np.float32) self._interpolator = None self._maxabs_shift, self._maxabs_value = -self.max_shift_size-1, -self.max_shift_size-1
def _scaled_correlation(self, x:np.ndarray, y:np.ndarray, use_fisher:bool=False)->float: """ Python implementation of scaled correlation analysis (SCA) between two array of samples. For reference, see Nikolic et al. "Scaled correlation analysis: a better way to compute a cross-correlogram" (2012). Args: x (np.ndarray): First array of samples y (np.ndarray): Second array s (int): Scale/segment size use_fisher (bool, optional): Whether or not use Fisher transformation during SCA. Defaults to False. Returns: float: scaled correlation value """ s = self.scale_size T = len(x) # window size K = T//s # number of scale segments that fit into window if not use_fisher: r_s = 0.0 # scaled correlation for i in range(K): r_s += _pearson_correlation(x[i*s:(i + 1)*s], y[i*s:(i + 1)*s]) r_s /= K else: z = 0.0 # Fisher transform of scaled correlation for i in range(K): z += _fisher_transform(_pearson_correlation(x[i*s:(i + 1)*s], y[i*s:(i + 1)*s])) z /= K r_s = _inverse_fisher_transform(z) return r_s def _compute_py(self, x:np.ndarray, y:np.ndarray, use_fisher:bool=True)->None: """ Compute CC using pure python implementation (slow). Args: x (np.ndarray): first array of data points y (np.ndarray): second array of data points use_fisher (bool, optional): Whether or not use Fisher transformation during SCA. Defaults to True. """ self.corr_coeff_arr = np.zeros(2*self.max_shift_size + 1, dtype=float) n_x = len(x) n_y = len(y) if self.scale_size is not None: # SCA # shifting x to the left, relative to y for shift in range(-self.max_shift_size, 0): self.corr_coeff_arr[shift + self.max_shift_size] = self._scaled_correlation(x[abs(shift):], y[:n_y - abs(shift)], use_fisher=use_fisher) # zero shift self.corr_coeff_arr[self.max_shift_size] = self._scaled_correlation(x, y, use_fisher=use_fisher) # shifting x to the right, relative to y for shift in range(1, self.max_shift_size + 1): self.corr_coeff_arr[shift + self.max_shift_size] = self._scaled_correlation(x[:n_x - shift], y[shift:], use_fisher=use_fisher) else: # CC # shifting x to the left, relative to y for shift in range(-self.max_shift_size, 0): self.corr_coeff_arr[shift + self.max_shift_size] = _pearson_correlation(x[abs(shift):], y[:n_y - abs(shift)]) # zero shift self.corr_coeff_arr[self.max_shift_size] = _pearson_correlation(x, y) # shifting x to the right, relative to y for shift in range(1, self.max_shift_size + 1): self.corr_coeff_arr[shift + self.max_shift_size] = _pearson_correlation(x[:n_x - shift], y[shift:]) def _compute_cy(self, x:np.ndarray, y:np.ndarray, use_fisher:bool=True)->None: """ Compute CC using cython implementation (recommended). Args: x (np.ndarray): first array of data points y (np.ndarray): second array of data points use_fisher (bool, optional): Whether or not use Fisher transformation during SCA. Defaults to True. """ self.corr_coeff_arr = np.zeros(2*self.max_shift_size + 1, dtype=np.float32) if self.scale_size is not None: cross_correlation(self.corr_coeff_arr, x, y, self.max_shift_size, self.scale_size, use_fisher) else: cross_correlation(self.corr_coeff_arr, x, y, self.max_shift_size, -1, use_fisher)
[docs] def compute(self, x:np.ndarray, y:np.ndarray, use_fisher:bool=True, cc_method:str="C")->None: """ Compute CC using one of the implementations (cython or python). Args: x (np.ndarray): first array of data points y (np.ndarray): second array of data points use_fisher (bool, optional): Whether or not use Fisher transformation during SCA. Defaults to True. cc_method (str, optional): Which implementation to use for the computation. Defaults to "C". Raises: ValueError: Input arrays are of uneven length ValueError: Length of input arrays at maximal shift during SCA is less than the scale size ValueError: Wrong implementation method is given """ if len(x) != len(y): raise ValueError(f"Dimension mismatch between x ({len(x)}) and y ({len(y)})") if self.scale_size is not None and len(x) < self.scale_size + self.max_shift_size: raise ValueError(f"SCA: Length of correlation window at max shift ({len(x)} - {self.max_shift_size} = {len(x) - self.max_shift_size}) must be greater or equal than the scale/segment size ({self.scale_size})") x = x.astype(np.float32) # cython code expects float32 y = y.astype(np.float32) if cc_method == "C": self._compute_cy(x, y, use_fisher=use_fisher) elif cc_method == "py": self._compute_py(x, y, use_fisher=use_fisher) else: raise ValueError("Only two options are supported for `cc_method` parameter: 'py' for pure python implementation, 'C' for cython (recommended)")
def _find_extremas(self)->np.ndarray: """ Find extremas (peaks) of the correlogram. Returns: np.ndarray: array of booleans representing whether the given point of the correlogram is an extrema or not """ extremas = np.full(len(self.corr_coeff_arr), False, dtype=bool) for i in range(1, len(self.corr_coeff_arr) - 1): if (self.corr_coeff_arr[i] - self.corr_coeff_arr[i - 1])*(self.corr_coeff_arr[i + 1] - self.corr_coeff_arr[i]) < 0: extremas[i] = True return extremas def _set_interpolator(self, shifts:np.ndarray, values:np.ndarray): """ Wrapper function for the interpolator object. Args: shifts (np.ndarray): Time shifts (x axis of the correlogram) values (np.ndarray): Correlation values (y axis of the correlogram) """ self._interpolator = Akima1DInterpolator(shifts, values) def _interpolate(self, shift:float): if self._interpolator is None: raise ValueError('Akima interpolator object not set') return self._interpolator(shift)
[docs] def get_maxabs(self)->tuple: """ Function that determines & saves MaxAbs (absolute peak) of the interpolated correlogram. Raises: ValueError: Correlogram is not computed. Returns: tuple: MaxAbs shift, correlation value """ if (self.corr_coeff_arr == 0.0).all(): raise ValueError("Correlogram not computed.") if self.max_shift_size == 0: # if no shift was specified, return the only value as MaxAbs return self.max_shift_size, self.corr_coeff_arr[0] # finding maxabs on correlogram is_extrema = self._find_extremas() # check if no peaks were detected on correlogram if (is_extrema == False).all(): maxabs_idx = np.argmax(np.abs(self.corr_coeff_arr)) # in that case, maxabs will be its maximal absolute value (either first or last point, because it's monotonic) self._maxabs_shift = maxabs_idx - self.max_shift_size self._maxabs_value = self.corr_coeff_arr[maxabs_idx] print(f'\tNOTE: No peaks found on the correlogram (it is monotonic function). Returning the maximal absolute value of the correlogram ({self._maxabs_value}) located at the maximal shift ({self._maxabs_shift}).') return self._maxabs_shift, self._maxabs_value argmax_idx = np.argmax(np.abs(self.corr_coeff_arr[is_extrema])) maxabs_idx = np.where(is_extrema == 1)[0][argmax_idx] # index of extrema with max abs in the correlogram # preparing data for interpolation shifts = np.arange(-self.max_shift_size, self.max_shift_size + 1, 1) values = self.corr_coeff_arr # interpolate & find minima if self.corr_coeff_arr[maxabs_idx] < self.corr_coeff_arr[maxabs_idx - 1] and self.corr_coeff_arr[maxabs_idx] < self.corr_coeff_arr[maxabs_idx + 1]: self._maxabs_is_min = True self._set_interpolator(shifts, values) res = minimize_scalar(self._interpolate, bounds=(shifts[maxabs_idx - 1], shifts[maxabs_idx + 1]), method='bounded') # find minimum near the detected peak (maxabs) maxabs_hires_shift = res.x maxabs_hires_value = res.fun elif self.corr_coeff_arr[maxabs_idx] > self.corr_coeff_arr[maxabs_idx - 1] and self.corr_coeff_arr[maxabs_idx] > self.corr_coeff_arr[maxabs_idx + 1]: self._maxabs_is_min = False self._set_interpolator(shifts, -values) # if extrema is maxima, flip the function & find the minimas res = minimize_scalar(self._interpolate, bounds=(shifts[maxabs_idx - 1], shifts[maxabs_idx + 1]), method='bounded') maxabs_hires_shift = res.x maxabs_hires_value = -res.fun # flipping back to get maxima value self._maxabs_shift = maxabs_hires_shift self._maxabs_value = maxabs_hires_value return self._maxabs_shift, self._maxabs_value
[docs] def plot(self, show_interp:bool=False, show_maxabs:bool=False, interp_shift_res:float=0.1): """ Plot correlogram. Args: show_interp (bool, optional): Show interpolated version. Defaults to False. show_maxabs (bool, optional): Show maxabs on the plot. Defaults to False. interp_shift_res (float, optional): Resolution of the interpolated correlogram. Defaults to 0.1. Raises: Warning: MaxAbs was not determined. """ shifts = np.arange(-self.max_shift_size, self.max_shift_size + 1, 1) plt.scatter(shifts, self.corr_coeff_arr, color="tab:blue", label="CC") plt.plot(shifts, np.abs(self.corr_coeff_arr), color="tab:orange", alpha=0.6, label="Abs(CC)") if show_interp: hires_shifts = np.arange(-self.max_shift_size, self.max_shift_size + 1, interp_shift_res) if self._maxabs_is_min: plt.plot(hires_shifts, self._interpolator(hires_shifts), color="tab:blue", label="SmoothCC") else: plt.plot(hires_shifts, -self._interpolator(hires_shifts), color="tab:blue", label="SmoothCC") if show_maxabs: if self._maxabs_shift == -self.max_shift_size-1 or self._maxabs_value == -self.max_shift_size-1: raise Warning("Cannot plot maxabs, as it wasn't computed. Calculate it with CrossCorrelogram.get_maxabs() method") plt.scatter([self._maxabs_shift], [self._maxabs_value], color="tab:red", alpha=0.6, s=100, label="MaxAbsVal") plt.xlabel("shifts [S.U.]") plt.grid() plt.legend()