"""
GWAS and Polygenic Score (PGS) computation from simulation output.
Provides:
- GWAS: per-variant association statistics (beta, SE, t, p) for each phenotype
- PGS: polygenic score calculation from external weights
"""
from __future__ import annotations
import numpy as np
from scipy import stats
from typing import Dict, List, Optional, Union
from dataclasses import dataclass, field
from xftsim.struct import HaplotypeOperator, DenseHaplotypeArray, PhenotypeArray
[docs]
@dataclass
class GWASResult:
"""
Per-variant association statistics for a single phenotype.
Attributes
----------
beta : np.ndarray
(m,) regression coefficients (phenotype on genotype dosage).
se : np.ndarray
(m,) standard errors of beta.
t_stat : np.ndarray
(m,) t-statistics (beta / se).
p_value : np.ndarray
(m,) two-sided p-values.
af : np.ndarray
(m,) allele frequencies.
n : int
Sample size used.
"""
beta: np.ndarray
se: np.ndarray
t_stat: np.ndarray
p_value: np.ndarray
af: np.ndarray
n: int
[docs]
class GWAS:
"""
Genome-wide association study: per-variant regression of phenotype on
genotype dosage.
Computes OLS regression statistics for every variant simultaneously
using efficient matrix operations (no per-variant loops).
Parameters
----------
haplotypes : HaplotypeOperator
Genotype data (DenseHaplotypeArray or GraphHaplotypeOperator).
phenotypes : PhenotypeArray
Phenotype data (dict-like, keyed by phenotype name).
sample_indices : np.ndarray, optional
Indices of samples to include (e.g. from an UnrelatedView).
If None, all samples are used.
Examples
--------
>>> gwas = GWAS(haplotypes, phenotypes)
>>> results = gwas.run() # all phenotype keys
>>> results = gwas.run(['Y']) # specific keys
>>> r = results['Y']
>>> r.beta, r.se, r.p_value
"""
def __init__(self, haplotypes: HaplotypeOperator,
phenotypes: PhenotypeArray,
sample_indices: np.ndarray | None = None) -> None:
self.haplotypes: HaplotypeOperator = haplotypes
self.phenotypes: PhenotypeArray = phenotypes
self.sample_indices: np.ndarray | None = sample_indices
[docs]
def run(self, keys: list[str] | None = None) -> dict[str, GWASResult]:
"""
Run GWAS for specified phenotype keys.
Parameters
----------
keys : list of str, optional
Phenotype keys to analyze. If None, uses all keys.
Returns
-------
dict[str, GWASResult]
Mapping from phenotype key to association results.
"""
if keys is None:
keys = list(self.phenotypes.keys)
# Materialize genotype dosage matrix
hap = self.haplotypes
if isinstance(hap, DenseHaplotypeArray):
G = hap.diploid_genotypes.astype(np.float64)
else:
G = hap.to_dense().diploid_genotypes.astype(np.float64)
# Subset samples if requested
if self.sample_indices is not None:
idx = self.sample_indices
G = G[idx, :]
else:
idx = None
n, m = G.shape
# Allele frequencies
af = G.mean(axis=0) / 2.0
# Precompute per-variant sums for OLS
# For simple linear regression y ~ x:
# beta_j = Cov(x_j, y) / Var(x_j)
# SE = sqrt(MSE / SS_x)
# Vectorized across all m variants.
G_mean = G.mean(axis=0) # (m,)
G_centered = G - G_mean # (n, m)
ss_x = np.sum(G_centered ** 2, axis=0) # (m,)
results = {}
for key in keys:
y = self.phenotypes[key]
if idx is not None:
y = y[idx]
y = np.asarray(y, dtype=np.float64)
y_mean = y.mean()
y_centered = y - y_mean # (n,)
# Cross-products: sum( (G_j - mean_j) * (y - mean_y) ) for each j
cross = G_centered.T @ y_centered # (m,)
# Regression coefficient
beta = np.where(ss_x > 0, cross / ss_x, 0.0)
# Residuals for each variant: r_ij = y_i - (alpha_j + beta_j * G_ij)
# But computing n x m residuals is expensive. Instead use the identity:
# MSE_j = (SS_y - beta_j^2 * SS_x_j) / (n - 2)
ss_y = np.sum(y_centered ** 2)
# Residual sum of squares per variant
ss_res = ss_y - beta ** 2 * ss_x # (m,)
ss_res = np.maximum(ss_res, 0.0) # numerical guard
df = n - 2
if df <= 0:
# Degenerate case: not enough samples
se = np.full(m, np.nan)
t_stat = np.full(m, np.nan)
p_value = np.full(m, np.nan)
else:
mse = ss_res / df # (m,)
se = np.where(ss_x > 0, np.sqrt(mse / ss_x), np.nan)
t_stat = np.where(se > 0, beta / se, 0.0)
# Two-sided p-value from t-distribution
p_value = np.where(
np.isfinite(t_stat),
2.0 * stats.t.sf(np.abs(t_stat), df=df),
np.nan,
)
results[key] = GWASResult(
beta=beta,
se=se,
t_stat=t_stat,
p_value=p_value,
af=af,
n=n,
)
return results
[docs]
class PGS:
"""
Polygenic score calculator.
Computes PGS = haplotypes @ weights using the HaplotypeOperator's
matvec (raw or standardized).
Parameters
----------
haplotypes : HaplotypeOperator
Genotype data.
weights : np.ndarray or dict
Effect weights. If dict, maps variant IDs to weights (looked up
against haplotypes.variants.vid). If np.ndarray, must be (m,).
standardized : bool
If True, use standardized genotypes (centered by 2*AF).
Default is False (raw dosage).
Examples
--------
>>> pgs = PGS(haplotypes, weights=beta_hat)
>>> scores = pgs.score()
"""
def __init__(self, haplotypes: HaplotypeOperator,
weights: np.ndarray | dict[str, float],
standardized: bool = False) -> None:
self.haplotypes: HaplotypeOperator = haplotypes
self.standardized: bool = standardized
if isinstance(weights, dict):
# Look up variant IDs and build weight vector
vid = self.haplotypes.variants.vid
w = np.zeros(len(vid), dtype=np.float64)
vid_list = list(vid)
for v, wt in weights.items():
try:
j = vid_list.index(v)
w[j] = wt
except ValueError:
pass # variant not in haplotypes, skip
self.weights = w
else:
self.weights = np.asarray(weights, dtype=np.float64)
[docs]
def score(self) -> np.ndarray:
"""
Compute polygenic scores.
Returns
-------
np.ndarray
(n,) array of polygenic scores, one per sample.
"""
if self.standardized:
return self.haplotypes.standardized_matvec(self.weights)
else:
return self.haplotypes.matvec(self.weights)