semopy



Module semopy.polycorr

This module implements polychoric and polyserial correlations.

Expand source code
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""This module implements polychoric and polyserial correlations."""
from statsmodels.stats.correlation_tools import corr_nearest
from scipy.optimize import minimize, minimize_scalar
from itertools import chain, product, combinations
from scipy.stats import norm, mvn
from .utils import cor
import pandas as pd
import numpy as np


def bivariate_cdf(lower, upper, corr, means=[0, 0], var=[1, 1]):
    """
    Estimates an integral of bivariate pdf.

    Estimates an integral of bivariate pdf given integration lower and 
    upper limits. Consider using relatively big (i.e. 20 if using default mean
    and variance) lower and/or upper bounds when integrating to/from infinity.
    Parameters
    ----------
    lower : float
        Lower integration bounds.
    upper : float
        Upper integration bounds.
    corr : float
        Correlation coefficient between variables.
    means : list, optional
        Mean values of variables. The default is [0, 0].
    var : list, optional
        Variances of variables. The default is [1, 1].

    Returns
    -------
    float
        P(lower[0] < x < upper[0], lower[1] < y < upper[1]).

    """
    s = np.array([[var[0], corr], [corr, var[1]]])
    return mvn.mvnun(lower, upper, means, s)[0]

def univariate_cdf(lower, upper, mean=0, var=1):
    """
    Estimate an integral of univariate pdf.

    Estimate an integral of univariate pdf given integration lower and 
    upper limits. Consider using relatively big (i.e. 20 if using default mean
    and variance) lower and/or upper bounds when integrating to/from infinity.
    Parameters
    ----------
    lower : float
        Lower integration bound..
    upper : float
        Upper integration bound..
    mean : float, optional
        Mean value of the variable. The default is 0.
    var : float, optional
        Variance of the variable. The default is 1.

    Returns
    -------
    float
        P(lower < x < upper).

    """
    return mvn.mvnun([lower], [upper], [mean], [var])[0]

def estimate_intervals(x, inf=10):
    """
    Estimate intervals of the polytomized underlying latent variable.

    Parameters
    ----------
    x : np.ndarray
        An array of values the ordinal variable..
    inf : float, optional
        A numerical infinity substitute. The default is 10.

    Returns
    -------
    np.ndarray
        An array containing polytomy intervals.
    np.ndarray
        An array containing indices of intervals corresponding to each entry.

    """
    x_f = x[~np.isnan(x)]
    u, counts = np.unique(x_f, return_counts=True)
    sz = len(x_f)
    cumcounts = np.cumsum(counts[:-1])
    u = [np.where(u == sample)[0][0] + 1 for sample in x]
    return list(chain([-inf], (norm.ppf(n / sz) for n in cumcounts), [inf])), u

def polyserial_corr(x, y, x_mean=None, x_var=None, x_z=None, x_pdfs=None,
                    y_ints=None, scalar=True):
    """
    Estimate polyserial correlation.
    
    Estimate polyserial correlation between continious variable x and
    ordinal variable y.
    Parameters
    ----------
    x : np.ndarray
        Data sample corresponding to x.
    y : np.ndarray
        Data sample corresponding to ordinal variable y.
    x_mean : float, optional
        Mean of x (calculate if not provided). The default is None.
    x_var : float, optional
        Variance of x (calculate if not provided). The default is None.
    x_z : np.ndarray, optional
        Stabdardized x (calculated if not provided). The default is None.
    x_pdfs : np.ndarray, optional
        x's logpdf sampled at each point (calculated if not provided). The
        default is None.
    y_ints : list, optional
        Polytomic intervals of an underlying latent variable
        correspoding to y (calculated if not provided) as returned by
        estimate_intervals.. The default is None.
    scalar : bool, optional
        If true minimize_scalar is used instead of SLSQP.. The default is True.

    Returns
    -------
    float
        A polyserial correlation coefficient for x and y..

    """
    if x_mean is None:
        x_mean = np.nanmean(x)
    if x_var is None:
        x_var = np.nanvar(x)
    if y_ints is None:
        y_ints = estimate_intervals(y)
    if x_z is None:
        x_z = (x - x_mean) / x_var
    if x_pdfs is None:
        x_pdfs = norm.logpdf(x, x_mean, x_var)
    ints, inds = y_ints
    def transform_tau(tau, rho, z):
        return (tau - rho * z) / np.sqrt(1 - rho ** 2)
    def sub_pr(k, rho, z):
        i = transform_tau(ints[k], rho, z)
        j = transform_tau(ints[k - 1], rho, z)
        return univariate_cdf(j, i)
    def calc_likelihood(rho):
        return -sum(pdf + np.log(sub_pr(ind, rho, z))
                    for z, ind, pdf in zip(x_z, inds, x_pdfs))
    def calc_likelihood_derivative(rho):
        def sub(k, z):
            i = transform_tau(ints[k], rho, z)
            j = transform_tau(ints[k - 1], rho, z)
            a = norm.pdf(i) * (ints[k] * rho - z)
            b = norm.pdf(j) * (ints[k - 1] * rho - z)
            return a - b
        t = (1 - rho ** 2) ** 1.5
        return -sum(sub(ind, z) / sub_pr(ind, rho, z)
                   for x, z, ind in zip(x, x_z, inds) if not np.isnan(x)) / t
    if not scalar:
        res = minimize(calc_likelihood, [0.0], jac=calc_likelihood_derivative,
                       method='SLSQP', bounds=[(-1.0, 1.0)]).x[0]
    else:
        res = minimize_scalar(calc_likelihood, bounds=(-1, 1),
                              method='bounded').x
    return res

def polychoric_corr(x, y, x_ints=None, y_ints=None):
    """
    Estimate polyserial correlation between ordinal variables x and y.

    Parameters
    ----------
    x : np.ndarray
        Data sample corresponding to x.
    y : np.ndarray
        Data sample corresponding to y.
    x_ints : list, optional
        Polytomic intervals of an underlying latent variable correspoding to y
        (calculated if not provided) as returned by estimate_intervals. The
        default is None.
    y_ints : list, optional
        Polytomic intervals of an underlying latent variable correspoding to y
        (calculated if not provided) as returned by estimate_intervals. The
        default is None.

    Returns
    -------
    float
        A polychoric correlation coefficient for x and y.

    """
    if x_ints is None:
        x_ints = estimate_intervals(x)
    if y_ints is None:
        y_ints = estimate_intervals(y)
    x_ints, x_inds = x_ints
    y_ints, y_inds = y_ints
    p, m = len(x_ints) - 1, len(y_ints) - 1
    n = np.zeros((p, m))
    for a, b in zip(x_inds, y_inds):
        if not (np.isnan(a) or np.isnan(b)):
            n[a - 1, b - 1] += 1
    def calc_likelihood(r):
        return -sum(np.log(bivariate_cdf([x_ints[i], y_ints[j]],
                                 [x_ints[i + 1], y_ints[j + 1]], r)) * n[i, j]
                    for i in range(p) for j in range(m))
    return minimize_scalar(calc_likelihood, bounds=(-1, 1), method='bounded').x
                

def hetcor(data, ords=None, nearest=False):
    """
    Compute a heterogenous correlation matrix.

    Parameters
    ----------
    data : pd.DataFrame or np.ndarray
        DESCRIPTION.
    ords : list, optional
        Names of ordinal variables if data is DataFrame or indices of
        ordinal numbers if data is np.array. If ords are None then ordinal
        variables will be determined automatically. The default is None.
    nearest : bool, optional
        If True, then nearest PD correlation matrix is returned instead. The
        default is False.

    Returns
    -------
    cor : pd.DataFrame
        A heterogenous correlation matrix.

    """
    if type(data) is np.ndarray:
        cov = cor(data)
        if ords is None:
            ords = set()
            for i in range(data.shape[1]):
                if len(np.unique(data[:, i])) / data.shape[0] < 0.3:
                    ords.add(i)
        conts = set(range(data.shape[1])) - set(ords)
    else:
        cov = data.corr()
        if ords is None:
            ords = set()
            for var in data:
                if len(data[var].unique()) / len(data[var]) < 0.3:
                    ords.add(var)
        conts = set(data.columns) - set(ords)
    c_means = {v: np.nanmean(data[v]) for v in conts}
    c_vars = {v: np.nanvar(data[v]) for v in conts}
    c_z = {v: (data[v] - c_means[v]) / c_vars[v] for v in conts}
    c_pdfs = {v: norm.logpdf(data[v], c_means[v], c_vars[v]) for v in conts}
    o_ints = {v: estimate_intervals(data[v]) for v in ords}

    for c, o in product(conts, ords):
        cov[c][o] = polyserial_corr(data[c], data[o], x_mean=c_means[c],
                                    x_var=c_vars[c], x_z=c_z[c],
                                    x_pdfs=c_pdfs[c], y_ints=o_ints[o])
        cov[o][c] = cov[c][o]
    for a, b in combinations(ords, 2):
        cov[a][b] = polychoric_corr(data[a], data[b], o_ints[a], o_ints[b])
        cov[b][a] = cov[a][b]
    if nearest:
        if type(cov) is pd.DataFrame:
            names = cov.columns
            cov = corr_nearest(cov,threshold=0.05)
            cov = pd.DataFrame(cov, columns=names, index=names)
        else:
            cov = corr_nearest(cov, threshold=0.05)
    return cov

Functions

def bivariate_cdf(lower, upper, corr, means=[0, 0], var=[1, 1])

Estimates an integral of bivariate pdf.

Estimates an integral of bivariate pdf given integration lower and upper limits. Consider using relatively big (i.e. 20 if using default mean and variance) lower and/or upper bounds when integrating to/from infinity. Parameters


lower : float
Lower integration bounds.
upper : float
Upper integration bounds.
corr : float
Correlation coefficient between variables.
means : list, optional
Mean values of variables. The default is [0, 0].
var : list, optional
Variances of variables. The default is [1, 1].

Returns

float
P(lower[0] < x < upper[0], lower[1] < y < upper[1]).
Expand source code
def bivariate_cdf(lower, upper, corr, means=[0, 0], var=[1, 1]):
    """
    Estimates an integral of bivariate pdf.

    Estimates an integral of bivariate pdf given integration lower and 
    upper limits. Consider using relatively big (i.e. 20 if using default mean
    and variance) lower and/or upper bounds when integrating to/from infinity.
    Parameters
    ----------
    lower : float
        Lower integration bounds.
    upper : float
        Upper integration bounds.
    corr : float
        Correlation coefficient between variables.
    means : list, optional
        Mean values of variables. The default is [0, 0].
    var : list, optional
        Variances of variables. The default is [1, 1].

    Returns
    -------
    float
        P(lower[0] < x < upper[0], lower[1] < y < upper[1]).

    """
    s = np.array([[var[0], corr], [corr, var[1]]])
    return mvn.mvnun(lower, upper, means, s)[0]
def estimate_intervals(x, inf=10)

Estimate intervals of the polytomized underlying latent variable.

Parameters

x : np.ndarray
An array of values the ordinal variable..
inf : float, optional
A numerical infinity substitute. The default is 10.

Returns

np.ndarray
An array containing polytomy intervals.
np.ndarray
An array containing indices of intervals corresponding to each entry.
Expand source code
def estimate_intervals(x, inf=10):
    """
    Estimate intervals of the polytomized underlying latent variable.

    Parameters
    ----------
    x : np.ndarray
        An array of values the ordinal variable..
    inf : float, optional
        A numerical infinity substitute. The default is 10.

    Returns
    -------
    np.ndarray
        An array containing polytomy intervals.
    np.ndarray
        An array containing indices of intervals corresponding to each entry.

    """
    x_f = x[~np.isnan(x)]
    u, counts = np.unique(x_f, return_counts=True)
    sz = len(x_f)
    cumcounts = np.cumsum(counts[:-1])
    u = [np.where(u == sample)[0][0] + 1 for sample in x]
    return list(chain([-inf], (norm.ppf(n / sz) for n in cumcounts), [inf])), u
def hetcor(data, ords=None, nearest=False)

Compute a heterogenous correlation matrix.

Parameters

data : pd.DataFrame or np.ndarray
DESCRIPTION.
ords : list, optional
Names of ordinal variables if data is DataFrame or indices of ordinal numbers if data is np.array. If ords are None then ordinal variables will be determined automatically. The default is None.
nearest : bool, optional
If True, then nearest PD correlation matrix is returned instead. The default is False.

Returns

cor : pd.DataFrame
A heterogenous correlation matrix.
Expand source code
def hetcor(data, ords=None, nearest=False):
    """
    Compute a heterogenous correlation matrix.

    Parameters
    ----------
    data : pd.DataFrame or np.ndarray
        DESCRIPTION.
    ords : list, optional
        Names of ordinal variables if data is DataFrame or indices of
        ordinal numbers if data is np.array. If ords are None then ordinal
        variables will be determined automatically. The default is None.
    nearest : bool, optional
        If True, then nearest PD correlation matrix is returned instead. The
        default is False.

    Returns
    -------
    cor : pd.DataFrame
        A heterogenous correlation matrix.

    """
    if type(data) is np.ndarray:
        cov = cor(data)
        if ords is None:
            ords = set()
            for i in range(data.shape[1]):
                if len(np.unique(data[:, i])) / data.shape[0] < 0.3:
                    ords.add(i)
        conts = set(range(data.shape[1])) - set(ords)
    else:
        cov = data.corr()
        if ords is None:
            ords = set()
            for var in data:
                if len(data[var].unique()) / len(data[var]) < 0.3:
                    ords.add(var)
        conts = set(data.columns) - set(ords)
    c_means = {v: np.nanmean(data[v]) for v in conts}
    c_vars = {v: np.nanvar(data[v]) for v in conts}
    c_z = {v: (data[v] - c_means[v]) / c_vars[v] for v in conts}
    c_pdfs = {v: norm.logpdf(data[v], c_means[v], c_vars[v]) for v in conts}
    o_ints = {v: estimate_intervals(data[v]) for v in ords}

    for c, o in product(conts, ords):
        cov[c][o] = polyserial_corr(data[c], data[o], x_mean=c_means[c],
                                    x_var=c_vars[c], x_z=c_z[c],
                                    x_pdfs=c_pdfs[c], y_ints=o_ints[o])
        cov[o][c] = cov[c][o]
    for a, b in combinations(ords, 2):
        cov[a][b] = polychoric_corr(data[a], data[b], o_ints[a], o_ints[b])
        cov[b][a] = cov[a][b]
    if nearest:
        if type(cov) is pd.DataFrame:
            names = cov.columns
            cov = corr_nearest(cov,threshold=0.05)
            cov = pd.DataFrame(cov, columns=names, index=names)
        else:
            cov = corr_nearest(cov, threshold=0.05)
    return cov
def polychoric_corr(x, y, x_ints=None, y_ints=None)

Estimate polyserial correlation between ordinal variables x and y.

Parameters

x : np.ndarray
Data sample corresponding to x.
y : np.ndarray
Data sample corresponding to y.
x_ints : list, optional
Polytomic intervals of an underlying latent variable correspoding to y (calculated if not provided) as returned by estimate_intervals. The default is None.
y_ints : list, optional
Polytomic intervals of an underlying latent variable correspoding to y (calculated if not provided) as returned by estimate_intervals. The default is None.

Returns

float
A polychoric correlation coefficient for x and y.
Expand source code
def polychoric_corr(x, y, x_ints=None, y_ints=None):
    """
    Estimate polyserial correlation between ordinal variables x and y.

    Parameters
    ----------
    x : np.ndarray
        Data sample corresponding to x.
    y : np.ndarray
        Data sample corresponding to y.
    x_ints : list, optional
        Polytomic intervals of an underlying latent variable correspoding to y
        (calculated if not provided) as returned by estimate_intervals. The
        default is None.
    y_ints : list, optional
        Polytomic intervals of an underlying latent variable correspoding to y
        (calculated if not provided) as returned by estimate_intervals. The
        default is None.

    Returns
    -------
    float
        A polychoric correlation coefficient for x and y.

    """
    if x_ints is None:
        x_ints = estimate_intervals(x)
    if y_ints is None:
        y_ints = estimate_intervals(y)
    x_ints, x_inds = x_ints
    y_ints, y_inds = y_ints
    p, m = len(x_ints) - 1, len(y_ints) - 1
    n = np.zeros((p, m))
    for a, b in zip(x_inds, y_inds):
        if not (np.isnan(a) or np.isnan(b)):
            n[a - 1, b - 1] += 1
    def calc_likelihood(r):
        return -sum(np.log(bivariate_cdf([x_ints[i], y_ints[j]],
                                 [x_ints[i + 1], y_ints[j + 1]], r)) * n[i, j]
                    for i in range(p) for j in range(m))
    return minimize_scalar(calc_likelihood, bounds=(-1, 1), method='bounded').x
def polyserial_corr(x, y, x_mean=None, x_var=None, x_z=None, x_pdfs=None, y_ints=None, scalar=True)

Estimate polyserial correlation.

Estimate polyserial correlation between continious variable x and ordinal variable y. Parameters


x : np.ndarray
Data sample corresponding to x.
y : np.ndarray
Data sample corresponding to ordinal variable y.
x_mean : float, optional
Mean of x (calculate if not provided). The default is None.
x_var : float, optional
Variance of x (calculate if not provided). The default is None.
x_z : np.ndarray, optional
Stabdardized x (calculated if not provided). The default is None.
x_pdfs : np.ndarray, optional
x's logpdf sampled at each point (calculated if not provided). The default is None.
y_ints : list, optional
Polytomic intervals of an underlying latent variable correspoding to y (calculated if not provided) as returned by estimate_intervals.. The default is None.
scalar : bool, optional
If true minimize_scalar is used instead of SLSQP.. The default is True.

Returns

float
A polyserial correlation coefficient for x and y..
Expand source code
def polyserial_corr(x, y, x_mean=None, x_var=None, x_z=None, x_pdfs=None,
                    y_ints=None, scalar=True):
    """
    Estimate polyserial correlation.
    
    Estimate polyserial correlation between continious variable x and
    ordinal variable y.
    Parameters
    ----------
    x : np.ndarray
        Data sample corresponding to x.
    y : np.ndarray
        Data sample corresponding to ordinal variable y.
    x_mean : float, optional
        Mean of x (calculate if not provided). The default is None.
    x_var : float, optional
        Variance of x (calculate if not provided). The default is None.
    x_z : np.ndarray, optional
        Stabdardized x (calculated if not provided). The default is None.
    x_pdfs : np.ndarray, optional
        x's logpdf sampled at each point (calculated if not provided). The
        default is None.
    y_ints : list, optional
        Polytomic intervals of an underlying latent variable
        correspoding to y (calculated if not provided) as returned by
        estimate_intervals.. The default is None.
    scalar : bool, optional
        If true minimize_scalar is used instead of SLSQP.. The default is True.

    Returns
    -------
    float
        A polyserial correlation coefficient for x and y..

    """
    if x_mean is None:
        x_mean = np.nanmean(x)
    if x_var is None:
        x_var = np.nanvar(x)
    if y_ints is None:
        y_ints = estimate_intervals(y)
    if x_z is None:
        x_z = (x - x_mean) / x_var
    if x_pdfs is None:
        x_pdfs = norm.logpdf(x, x_mean, x_var)
    ints, inds = y_ints
    def transform_tau(tau, rho, z):
        return (tau - rho * z) / np.sqrt(1 - rho ** 2)
    def sub_pr(k, rho, z):
        i = transform_tau(ints[k], rho, z)
        j = transform_tau(ints[k - 1], rho, z)
        return univariate_cdf(j, i)
    def calc_likelihood(rho):
        return -sum(pdf + np.log(sub_pr(ind, rho, z))
                    for z, ind, pdf in zip(x_z, inds, x_pdfs))
    def calc_likelihood_derivative(rho):
        def sub(k, z):
            i = transform_tau(ints[k], rho, z)
            j = transform_tau(ints[k - 1], rho, z)
            a = norm.pdf(i) * (ints[k] * rho - z)
            b = norm.pdf(j) * (ints[k - 1] * rho - z)
            return a - b
        t = (1 - rho ** 2) ** 1.5
        return -sum(sub(ind, z) / sub_pr(ind, rho, z)
                   for x, z, ind in zip(x, x_z, inds) if not np.isnan(x)) / t
    if not scalar:
        res = minimize(calc_likelihood, [0.0], jac=calc_likelihood_derivative,
                       method='SLSQP', bounds=[(-1.0, 1.0)]).x[0]
    else:
        res = minimize_scalar(calc_likelihood, bounds=(-1, 1),
                              method='bounded').x
    return res
def univariate_cdf(lower, upper, mean=0, var=1)

Estimate an integral of univariate pdf.

Estimate an integral of univariate pdf given integration lower and upper limits. Consider using relatively big (i.e. 20 if using default mean and variance) lower and/or upper bounds when integrating to/from infinity. Parameters


lower : float
Lower integration bound..
upper : float
Upper integration bound..
mean : float, optional
Mean value of the variable. The default is 0.
var : float, optional
Variance of the variable. The default is 1.

Returns

float
P(lower < x < upper).
Expand source code
def univariate_cdf(lower, upper, mean=0, var=1):
    """
    Estimate an integral of univariate pdf.

    Estimate an integral of univariate pdf given integration lower and 
    upper limits. Consider using relatively big (i.e. 20 if using default mean
    and variance) lower and/or upper bounds when integrating to/from infinity.
    Parameters
    ----------
    lower : float
        Lower integration bound..
    upper : float
        Upper integration bound..
    mean : float, optional
        Mean value of the variable. The default is 0.
    var : float, optional
        Variance of the variable. The default is 1.

    Returns
    -------
    float
        P(lower < x < upper).

    """
    return mvn.mvnun([lower], [upper], [mean], [var])[0]