Source code for synthpops.sampling

"""Sample distributions, either from real world data or from uniform distributions."""

import numpy as np
import numba as nb
import pandas as pd
import sciris as sc
import random
import itertools
import bisect
import scipy
from scipy import stats as st
import warnings
from . import base as spb
from . import plotting as sppl


[docs]def set_seed(seed=None): """Reset the random seed -- complicated because of Numba.""" @nb.njit((nb.int64,), cache=True) def set_seed_numba(seed): return np.random.seed(seed) def set_seed_regular(seed): return np.random.seed(seed) # Dies if a float is given if seed is not None: seed = int(seed) set_seed_regular(seed) # If None, reinitializes it if seed is None: # Numba can't accept a None seed, so use our just-reinitialized Numpy stream to generate one seed = np.random.randint(1e9) set_seed_numba(seed) random.seed(seed) # Finally, reset Python's built-in random number generator return
[docs]def fast_choice(weights): """ Choose an option -- quickly -- from the provided weights. Weights do not need to be normalized. Reimplementation of random.choices(), removing everything inessential. Example: fast_choice([0.1,0.2,0.3,0.2,0.1]) # might return 2 """ cum_weights = list(itertools.accumulate(weights)) return bisect.bisect(cum_weights, random.random()*(cum_weights[-1]), 0, len(cum_weights)-1)
# @nb.njit(cache=True)
[docs]def sample_single_dict(distr_keys, distr_vals): """ Sample from a distribution. Args: distr (dict or np.ndarray): distribution Returns: A single sampled value from a distribution. """ return distr_keys[fast_choice(distr_vals)]
[docs]def sample_single_arr(distr): """ Sample from a distribution. Args: distr (dict or np.ndarray): distribution Returns: A single sampled value from a distribution. """ return fast_choice(distr)
[docs]def resample_age(age_dist_vals, age): """ Resample age from single year age distribution. Args: single_year_age_distr (arr) : age distribution, ordered by age age (int) : age as an integer Returns: Resampled age as an integer. """ if age == 0: age_min = 0 age_max = 1 elif age == 1: age_min = 0 age_max = 2 elif age >= 2 and age <= 98: age_min = age - 2 age_max = age + 2 elif age == 99: age_min = 97 age_max = 99 else: age_min = 98 age_max = 100 age_distr = age_dist_vals[age_min:age_max + 1] # create an array of the values, not yet normalized age_range = np.arange(age_min, age_max + 1) return age_range[fast_choice(age_distr)]
[docs]def sample_from_range(distr, min_val, max_val): """ Sample from a distribution from min_val to max_val, inclusive. Args: distr (dict) : distribution with integer keys min_val (int) : minimum of the range to sample from max_val (int) : maximum of the range to sample from Returns: A sampled number from the range min_val to max_val in the distribution distr. """ new_distr = spb.norm_age_group(distr, min_val, max_val) distr_keys = np.array(list(new_distr.keys()), dtype=np.int64) distr_vals = np.array(list(new_distr.values()), dtype=np.float64) return sample_single_dict(distr_keys, distr_vals)
[docs]def check_dist(actual, expected, std=None, dist='norm', check='dist', label=None, alpha=0.05, size=10000, verbose=True, die=False, stats=False): """ Check whether counts match the expected distribution. The distribution can be any listed in scipy.stats. The parameters for the distribution should be supplied via the "expected" argument. The standard deviation for a normal distribution is a special case; it can be supplied separately or calculated from the (actual) data. Args: actual (int, float, or array) : the observed value, or distribution of values expected (int, float, tuple) : the expected value; or, a tuple of arguments std (float) : for normal distributions, the standard deviation of the expected value (taken from data if not supplied) dist (str) : the type of distribution to use check (str) : what to check: 'dist' = entire distribution (default), 'mean' (equivalent to supplying np.mean(actual)), or 'median' label (str) : the name of the variable being tested alpha (float) : the significance level at which to reject the null hypothesis size (int) : the size of the sample from the expected distribution to compare with if distribution is discrete verbose (bool) : print a warning if the null hypothesis is rejected die (bool) : raise an exception if the null hypothesis is rejected stats (bool) : whether to return statistics Returns: If stats is True, returns statistics: whether null hypothesis is rejected, pvalue, number of samples, expected quintiles, observed quintiles, and the observed quantile. **Examples**:: sp.check_dist(actual=[3,4,4,2,3], expected=3, dist='poisson') sp.check_dist(actual=[0.14, -3.37, 0.59, -0.07], expected=0, std=1.0, dist='norm') sp.check_dist(actual=5.5, expected=(1, 5), dist='lognorm') """ # Handle inputs label = f' "{label}"' if label else '' is_dist = sc.isiterable(actual) # Set distribution if dist.lower() in ['norm', 'normal', 'gaussian']: if std is None: if is_dist: std = np.std(actual) # Get standard deviation from the data else: # pragma: no cover std = 1.0 args = (expected, std) scipydist = getattr(scipy.stats, 'norm') truedist = scipy.stats.norm(expected, std) else: try: if sc.isnumber(expected): args = (expected, ) else: args = tuple(expected) scipydist = getattr(scipy.stats, dist) truedist = scipydist(*args) except Exception as E: errormsg = f'Distribution "{dist}" not supported with the expected values supplied; valid distributions are those in scipy.stats' raise NotImplementedError(errormsg) from E # Calculate stats if is_dist and check == 'dist': quantile = truedist.cdf(np.median(actual)) # only if distribution is continuous if isinstance(scipydist, scipy.stats.rv_continuous): teststat, pvalue = scipy.stats.kstest(rvs=actual, cdf=dist, args=args) # Use the K-S test to see if came from the same distribution # ks test against large sample from the theoretical distribution elif isinstance(scipydist, scipy.stats.rv_discrete): expected_r = truedist.rvs(size=size) teststat, pvalue = scipy.stats.ks_2samp(actual, expected_r) else: # pragma: no cover errormsg = 'Distribution is neither continuous or discrete and so not supported at this time.' raise NotImplementedError(errormsg) null = pvalue > alpha else: if check == 'mean': value = np.mean(actual) elif check == 'median': value = np.median(actual) else: value = actual quantile = truedist.cdf(value) # If it's a single value, see where it lands on the Poisson CDF pvalue = 1.0-2*abs(quantile-0.5) # E.g., 0.975 maps on to p=0.05 minquant = alpha/2 # e.g., 0.025 for alpha=0.05 maxquant = 1-alpha/2 # e.g., 0.975 for alpha=0.05 minval = truedist.ppf(minquant) maxval = truedist.ppf(maxquant) quant_check = (minquant <= quantile <= maxquant) # True if above minimum and below maximum val_check = (minval <= value <= maxval) # Check values null = quant_check or val_check # Consider it to pass if either passes # Additional stats n_samples = len(actual) if is_dist else 1 eps = 1.0/n_samples if n_samples > 4 else 1e-2 # For small number of samples, use default limits quintiles = [eps, 0.25, 0.5, 0.75, 1-eps] obvs_quin = np.quantile(actual, quintiles) if is_dist else actual expect_quin = truedist.ppf(quintiles) # If null hypothesis is rejected, print a warning or error if not null: msg = f'''' Variable{label} with n={n_samples} samples is out of range using the distribution: {dist}({args}) → p={pvalue} < α={alpha} Expected quintiles are: {expect_quin} Observed quintiles are: {obvs_quin} Observed median is in quantile: {quantile}''' if die: raise ValueError(msg) elif verbose: warnings.warn(msg) # If null hypothesis is not rejected, under verbose, print a confirmation if null and verbose: print(f'Check passed. Null hypothesis with expected distribution: {dist}{args} not rejected.') if is_dist and check == 'dist': print(f'Test statistic: {teststat}, pvalue: {pvalue}') if not stats: return null else: s = sc.objdict() s.null = null s.pvalue = pvalue s.n_samples = n_samples s.expected_quintiles = expect_quin s.observed_quintiles = obvs_quin s.observed_quantile = quantile return s
[docs]def check_normal(*args, **kwargs): ''' Alias to check_dist(dist='normal') ''' dist = kwargs.pop('dist', 'norm') return check_dist(*args, **kwargs, dist=dist)
[docs]def check_poisson(*args, **kwargs): ''' Alias to check_dist(dist='poisson') ''' dist = kwargs.pop('dist', 'poisson') return check_dist(*args, **kwargs, dist=dist)
[docs]def check_truncated_poisson(testdata, mu, lowerbound=None, upperbound=None, skipcheck=False, **kwargs): """ test if data fits in truncated poisson distribution between upperbound and lowerbound using kstest Args: testdata (array) : data to be tested mu (float) : expected mean for the poisson distribution lowerbound (float) : lowerbound for truncation upperbound (float) : upperbound for truncation Returns: (bool) return True if statistic check passed, else return False """ sample_size = len(testdata) # need to exclude any value below or equal to lowerbound and any value above or equal to upperbound, so we first find the quantile location for # lowerbound and upperbound then only generate poisson cdf values in between these 2 locations minquantile = st.poisson.cdf(lowerbound, mu=mu) if lowerbound else 0 maxquantile = st.poisson.cdf(upperbound, mu=mu) if upperbound else 1 # create uniformly distributed number between minquantile and maxquantile (in the cdf quantile space) q = np.random.uniform(low=minquantile, high=maxquantile, size=sample_size) # use percent point function to get inverse of cdf expected_data = st.poisson.ppf(q=q, mu=mu) result = True if not skipcheck: try: statistic_test(expected_data, testdata, test=st.kstest, verbose=True, die=True) result = True except ValueError as e: if 'reject the hypothesis' in str(e): result = False else: raise Exception(e) #plot comparison bins_count = int(min(10, max(expected_data)-min(expected_data)+1)) print(f"bins:{bins_count}") expected, bins = np.histogram(expected_data, bins=bins_count) actual = np.histogram(testdata, bins=bins)[0] kwargs["generated"] = actual #merge 11 bins to 10 for bar plot align at center merged_bins = [round((bins[idx] + bins[idx+1])/2,2) for idx, val in enumerate(bins) if idx < len(bins)-1] kwargs["xvalue"] = merged_bins if kwargs["do_show"]: sppl.plot_array(expected, **kwargs) return result
[docs]def statistic_test(expected, actual, test=st.chisquare, verbose=True, die=False, **kwargs): """ Perform statistical checks for expected and actual data based on the null hypothesis that expected and actual distributions are identical. Throw assertion if the expected and actual data differ significantly based on the test selected. See https://docs.scipy.org/doc/scipy/reference/stats.html#statistical-tests. Args: expected (array) : the expected value; or, a tuple of arguments actual (array) : the observed value, or distribution of values test (scipy.stats) : scipy statistical tests functions, for example scipy.stats.chisquare verbose (bool) : print a warning if the null hypothesis is rejected die (bool) : raise an exception if the null hypothesis is rejected **kwargs (dict) : optional arguments for statistical tests Returns: None. """ # data = {'expected': expected, 'actual': actual} df_expected = pd.DataFrame(expected, columns=['expected']) df_actual = pd.DataFrame(actual, columns=['actual']) details = pd.merge(df_expected.describe(), df_actual.describe(), left_index=True, right_index=True, suffixes=('expected', 'actual')) print(f"use {str(test.__name__)} to check actual distribution") s, p = test(expected, actual, **kwargs) print(f"statistics: {s} pvalue:{p}") if p < 0.05: # pragma: no cover msg = f"Under the null hypothesis the expected/actual distributions are identical." \ f"If statistics is small or the p-value is high (greater than the significance level 5%)," \ f" then we cannot reject the hypothesis. But we got p = {p} and s = {s}." if die: # pragma: no cover raise ValueError(msg) elif verbose: # pragma: no cover warnings.warn(f"data: \n{details}") warnings.warn(msg)