import warnings
import numpy as np
import pandas as pd
import nptyping as npt
import xarray as xr
import dask.array as da
from dataclasses import dataclass, field
from nptyping import NDArray, Int8, Int64, Float64, Bool, Shape, Float, Int
from abc import ABC, abstractmethod
from typing import Any, Hashable, List, Iterable, Callable, Union, Dict, Tuple, Optional
from functools import cached_property
from scipy import interpolate
import xftsim as xft
[docs]
class GeneticMap:
"""
Map between physical and genetic distances.
Parameters
----------
chrom : Iterable
Chromsomes variants are located on
pos_bp : Iterable
Physical positions of variants
pos_cM : Iterable
Map distances in cM
Attributes
__________
frame : pd.DataFrame
Pandas DataFrame with the above columns
chroms : np.ndarray
Unique chromosomes present in map
"""
def __init__(self,
chrom: Iterable,
pos_bp: Iterable,
pos_cM: Iterable):
self.frame = pd.DataFrame.from_dict(dict(chrom = chrom,
pos_bp = pos_bp,
pos_cM = pos_cM))
self.chroms = np.unique(self.frame.chrom.values.astype(int)).astype(str)
[docs]
@classmethod
def from_pyrho_maps(cls, paths: Iterable, sep='\t', **kwargs) -> "GeneticMap":
"""Construct genetic map objects from maps provided at https://github.com/popgenmethods/pyrho
Please cite their work if you use their maps.
Parameters
----------
paths : Iterable
Paths for each chromosome
sep : str, optional
Passed to pd.read_csv()
**kwargs
Additional arguments to pd.read_csv()
Returns
-------
GeneticMap
"""
gmap = pd.concat([pd.read_csv(path, sep='\t', **kwargs) for path in paths])
chrom = np.char.lstrip(gmap.Chromosome.values.astype(str),'chr')
pos_bp = gmap['Position(bp)'].values
pos_cM = gmap['Map(cM)'].values
return cls(chrom, pos_bp, pos_cM)
[docs]
def interpolate_cM_chrom(self, pos_bp: Iterable, chrom: str, **kwargs):
"""
Interpolate cM values in a specified chromosome based on genetic map information.
Parameters
----------
pos_bp : Iterable
Physical positions for which to interpolate cM values
chrom : str
Chromosome on which to interpolate
**kwargs
Additional keyword arguments to be passed to scipy.interpolate.interp1d.
"""
subset = self.frame[self.frame.chrom==chrom]
interpolator = interpolate.interp1d(x = subset.pos_bp.values,
y = subset.pos_cM.values,
**kwargs)
return interpolator(pos_bp)
# ---------------------------------------------------------------------------
# HaplotypeOperator ABC (new — typed against by Architecture, EffectSpec, etc.)
# ---------------------------------------------------------------------------
[docs]
class HaplotypeOperator(ABC):
"""Abstract base class for all genotype representations.
Defines the interface for matrix-vector operations on haplotype data,
used by the architecture's genetic components. All implementations
must provide ``samples`` (SampleMeta) and ``variants`` (VariantMeta)
attributes.
Concrete implementations:
- ``DenseHaplotypeArray`` -- NumPy-backed (n, m, 2) array
- ``GraphHaplotypeOperator`` -- GRG wrapper (pygrgl graph traversal)
Attributes
----------
samples : SampleMeta
Sample metadata (set by concrete implementations).
variants : VariantMeta
Variant metadata (set by concrete implementations).
"""
@property
@abstractmethod
def n(self) -> int:
"""Number of samples (individuals)."""
...
@property
@abstractmethod
def m(self) -> int:
"""Number of diploid variants."""
...
[docs]
@abstractmethod
def matvec(self, v: np.ndarray) -> np.ndarray:
"""Diploid genotype-vector product: G @ v.
Parameters
----------
v : np.ndarray
Effect vector of shape (m,) or (m, k).
Returns
-------
np.ndarray
Result of shape (n,) or (n, k).
"""
...
[docs]
@abstractmethod
def rmatvec(self, v: np.ndarray) -> np.ndarray:
"""Transpose genotype-vector product: G.T @ v.
Parameters
----------
v : np.ndarray
Vector of shape (n,) or (n, k).
Returns
-------
np.ndarray
Result of shape (m,) or (m, k).
"""
...
[docs]
@abstractmethod
def matvec_maternal(self, v: np.ndarray) -> np.ndarray:
"""Maternal haplotype matvec: hap[:,:,0] @ v.
Parameters
----------
v : np.ndarray
Effect vector of shape (m,).
Returns
-------
np.ndarray
Result of shape (n,).
"""
...
[docs]
@abstractmethod
def matvec_paternal(self, v: np.ndarray) -> np.ndarray:
"""Paternal haplotype matvec: hap[:,:,1] @ v.
Parameters
----------
v : np.ndarray
Effect vector of shape (m,).
Returns
-------
np.ndarray
Result of shape (n,).
"""
...
[docs]
@abstractmethod
def standardized_matvec(self, v: np.ndarray, af: np.ndarray = None) -> np.ndarray:
"""Per-SNP standardized diploid matvec.
Computes ((G - 2p) / sqrt(2p(1-p))) @ v where p is the allele
frequency vector. Each column is centered AND scaled to unit
variance under HWE.
Parameters
----------
v : np.ndarray
Effect vector of shape (m,).
af : np.ndarray, optional
Allele frequencies. If None, recomputed from data.
Returns
-------
np.ndarray
Result of shape (n,).
"""
...
[docs]
@abstractmethod
def recompute_af(self) -> np.ndarray:
"""Recompute empirical allele frequencies from current data.
Returns
-------
np.ndarray
Allele frequencies of shape (m,).
"""
...
[docs]
@abstractmethod
def to_dense(self) -> "DenseHaplotypeArray":
"""Materialize as a DenseHaplotypeArray.
Returns
-------
DenseHaplotypeArray
Dense representation of the haplotype data.
"""
...
[docs]
@abstractmethod
def meiosis(self, assignment, recombination_map, rng=None) -> "HaplotypeOperator":
"""Perform meiosis to produce offspring haplotypes.
Parameters
----------
assignment : MateAssignment
Mate assignment with maternal/paternal indices and offspring metadata.
recombination_map : RecombinationMap
Recombination probabilities between loci.
rng : numpy.random.RandomState, optional
Master RNG used to derive per-offspring crossover seeds. Pass
``self.rng`` from the simulation loop to keep crossover sampling
tied to the simulation's seed; ``None`` falls back to a fresh
``RandomState`` (non-deterministic).
Returns
-------
HaplotypeOperator
Offspring haplotypes.
"""
...
@abstractmethod
def __getitem__(self, key) -> "HaplotypeOperator":
"""Subset by sample/variant indices."""
...
[docs]
class DenseHaplotypeArray(HaplotypeOperator):
"""
Dense numpy-backed haplotype array implementing both old HaplotypeArray
interface and new HaplotypeOperator ABC.
Stores haplotypes as a 3D array with shape (n_samples, m_variants, 2)
where the last dimension represents the two haplotype copies.
Convention: genotypes[:,:,0] = maternal, genotypes[:,:,1] = paternal.
Parameters
----------
genotypes : np.ndarray
3D array of shape (n, m, 2) containing haplotype data.
generation : int, optional
Generation number. Default is 0.
samples : SampleMeta, optional
Sample metadata (iid, fid, sex).
variants : VariantMeta, optional
Variant metadata (vid, chrom, pos_bp, pos_cM, af, alleles).
"""
def __init__(
self,
genotypes: NDArray[Shape["*, *, *"], Int8],
generation: int = 0,
samples: Optional[SampleMeta] = None,
variants: Optional[VariantMeta] = None,
) -> None:
# call base class init
self._generation = generation
# Validate genotypes
if genotypes is None:
genotypes = np.empty((0, 0, 2), dtype=np.int8)
else:
genotypes = np.asarray(genotypes, dtype=np.int8)
if genotypes.ndim != 3 or genotypes.shape[2] != 2:
raise ValueError(
f"genotypes must be 3-D with last dim = 2, got shape {genotypes.shape}"
)
self.genotypes = genotypes
n, m, _ = genotypes.shape
# Create default SampleMeta if not provided
if samples is None:
iid = np.arange(n, dtype=np.int64)
self.samples = SampleMeta(iid=iid, generation=generation)
else:
if samples.n != n:
raise ValueError(
f"samples.n ({samples.n}) must match genotypes.shape[0] ({n})"
)
# Ensure SampleMeta has correct generation
if samples.generation != generation:
self.samples = samples.with_generation(generation)
else:
self.samples = samples
# Create default VariantMeta if not provided
if variants is None:
vid = np.arange(m, dtype=np.int64)
self.variants = VariantMeta(vid=vid)
else:
if variants.m != m:
raise ValueError(
f"variants.m ({variants.m}) must match genotypes.shape[1] ({m})"
)
self.variants = variants
@property
def generation(self) -> int:
"""Generation number."""
return self._generation
@generation.setter
def generation(self, value: int):
self._generation = value
# --- implement base class properties ---
@property
def n(self) -> int:
"""Number of samples."""
return self.genotypes.shape[0]
@property
def m(self) -> int:
"""Number of diploid variants."""
return self.genotypes.shape[1]
@property
def diploid_genotypes(self) -> np.ndarray:
"""Return diploid genotype counts (0, 1, or 2) as 2D array (n, m)."""
return self.genotypes[:, :, 0] + self.genotypes[:, :, 1]
# --- sample metadata accessors ---
@property
def iid(self) -> np.ndarray:
"""Individual IDs."""
return self.samples.iid
@property
def fid(self) -> np.ndarray:
"""Family IDs."""
return self.samples.fid
@property
def sex(self) -> np.ndarray:
"""Biological sex (0=female, 1=male)."""
return self.samples.sex
@property
def n_fam(self) -> int:
"""Number of unique families."""
return self.samples.n_fam
@property
def n_female(self) -> int:
"""Number of biological females."""
return self.samples.n_female
@property
def n_male(self) -> int:
"""Number of biological males."""
return self.samples.n_male
# --- variant metadata accessors ---
@property
def vid(self) -> np.ndarray:
"""Variant IDs."""
return self.variants.vid
@property
def chrom(self) -> Optional[np.ndarray]:
"""Chromosome for each variant."""
return self.variants.chrom
@property
def pos_bp(self) -> Optional[np.ndarray]:
"""Base pair position."""
return self.variants.pos_bp
@property
def pos_cM(self) -> Optional[np.ndarray]:
"""Centimorgan position."""
return self.variants.pos_cM
@property
def af(self) -> Optional[np.ndarray]:
"""Stored allele frequencies (from VariantMeta)."""
return self.variants.af
# --- matrix–vector operations ---
[docs]
def matvec(self, v: np.ndarray) -> np.ndarray:
"""Diploid matrix-vector product: G @ v where G is the (n, m) dosage matrix."""
return self.diploid_genotypes @ v
[docs]
def rmatvec(self, v: np.ndarray) -> np.ndarray:
"""Transpose matrix-vector product: G.T @ v."""
return self.diploid_genotypes.T @ v
[docs]
def standardized_matvec(self, v: np.ndarray, af: np.ndarray = None) -> np.ndarray:
"""Per-SNP standardized matvec: ((G - 2p) / sqrt(2p(1-p))) @ v."""
G_std = self.to_diploid_standardized(af=af, scale=True)
return G_std @ v
[docs]
def standardized_rmatvec(self, v: np.ndarray, af: np.ndarray = None) -> np.ndarray:
"""Per-SNP standardized rmatvec: ((G - 2p) / sqrt(2p(1-p))).T @ v."""
G_std = self.to_diploid_standardized(af=af, scale=True)
return G_std.T @ v
[docs]
def diploid_matvec(self, u: np.ndarray) -> np.ndarray:
"""(G[:, :, 0] + G[:, :, 1]) @ u."""
return (self.genotypes[:, :, 0] @ u) + (self.genotypes[:, :, 1] @ u)
[docs]
def standardized_haploid_matvec(self, u: np.ndarray, haploid: int) -> np.ndarray:
"""
Standardized matvec for one haplotype (0 or 1):
center & scale each variant column, then multiply by u.
"""
H = self.genotypes[:, :, haploid]
col_mean = H.mean(axis=0)
col_std = H.std(axis=0, ddof=1)
col_std[col_std == 0] = 1.0 # avoid divide-by-zero
H_std = (H - col_mean) / col_std
return H_std @ u
# --- subsetting ---
[docs]
def subset(
self,
sample_idx=None,
variant_idx=None,
copy: bool = True,
) -> "DenseHaplotypeArray":
"""
Return a new DenseHaplotypeArray with a subset of samples and/or variants.
Parameters
----------
sample_idx : array-like or slice, optional
Indices of samples to keep.
variant_idx : array-like or slice, optional
Indices of variants to keep.
copy : bool, optional
If True, copy the data. Default True.
Returns
-------
DenseHaplotypeArray
Subsetted haplotype array.
"""
if sample_idx is None:
sample_idx = slice(None)
if variant_idx is None:
variant_idx = slice(None)
new_genotypes = self.genotypes[sample_idx, :, :][:, variant_idx, :]
if copy:
new_genotypes = new_genotypes.copy()
new_samples = self.samples.subset(sample_idx)
new_variants = self.variants.subset(variant_idx)
return DenseHaplotypeArray(
genotypes=new_genotypes,
generation=self.generation,
samples=new_samples,
variants=new_variants,
)
def __getitem__(self, key) -> "DenseHaplotypeArray":
"""
Support numpy/xarray-style indexing: haplotypes[sample_idx] or haplotypes[sample_idx, variant_idx].
Parameters
----------
key : int, slice, array, or tuple
Index or indices for samples (and optionally variants).
Returns
-------
DenseHaplotypeArray
Subsetted haplotype array.
"""
if isinstance(key, tuple):
if len(key) == 1:
sample_idx = key[0]
variant_idx = slice(None)
elif len(key) == 2:
sample_idx, variant_idx = key
else:
raise IndexError(f"Too many indices: {len(key)}")
else:
sample_idx = key
variant_idx = slice(None)
return self.subset(sample_idx=sample_idx, variant_idx=variant_idx, copy=False)
[docs]
def drop_isel(self, sample=None, variant=None) -> "DenseHaplotypeArray":
"""
Drop samples or variants by index (xarray-style compatibility).
Parameters
----------
sample : array-like, optional
Indices of samples to drop.
variant : array-like, optional
Indices of variants to drop.
Returns
-------
DenseHaplotypeArray
Haplotype array with specified samples/variants removed.
"""
sample_idx = slice(None)
variant_idx = slice(None)
if sample is not None:
keep_samples = np.ones(self.n, dtype=bool)
keep_samples[sample] = False
sample_idx = np.where(keep_samples)[0]
if variant is not None:
keep_variants = np.ones(self.m, dtype=bool)
keep_variants[variant] = False
variant_idx = np.where(keep_variants)[0]
return self.subset(sample_idx=sample_idx, variant_idx=variant_idx, copy=True)
# --- Compatibility methods for xarray-style interface ---
@property
def shape(self) -> Tuple[int, int]:
"""Shape as (n_samples, 2*m_variants) for compatibility with 2D expectations."""
return (self.n, 2 * self.m)
@property
def data(self) -> np.ndarray:
"""Return genotypes in 2D interleaved format (n, 2m) for compatibility."""
# Interleave the two haplotypes: [h0_v0, h1_v0, h0_v1, h1_v1, ...]
n, m, _ = self.genotypes.shape
interleaved = np.empty((n, 2 * m), dtype=np.int8)
interleaved[:, 0::2] = self.genotypes[:, :, 0]
interleaved[:, 1::2] = self.genotypes[:, :, 1]
return interleaved
@property
def values(self) -> np.ndarray:
"""Alias for data property."""
return self.data
@property
def attrs(self) -> dict:
"""Return attributes dict for compatibility with xarray interface."""
return {'generation': self.generation}
@property
def af_empirical(self) -> np.ndarray:
"""Compute empirical allele frequencies from genotype data."""
# Mean across samples for each haplotype, then average the two haplotypes
af_hap0 = self.genotypes[:, :, 0].mean(axis=0)
af_hap1 = self.genotypes[:, :, 1].mean(axis=0)
return (af_hap0 + af_hap1) / 2
[docs]
def to_diploid_standardized(self, af: np.ndarray = None, scale: bool = False) -> np.ndarray:
"""
Return standardized diploid genotypes.
Parameters
----------
af : np.ndarray, optional
Allele frequencies to use for standardization. If None, uses empirical.
scale : bool, optional
If True, scale by sqrt(2*p*(1-p)). Default False.
Returns
-------
np.ndarray
Standardized diploid genotypes (n, m).
"""
G = self.diploid_genotypes.astype(np.float64)
if af is None:
af = self.af_empirical
# Center: subtract 2*p (expected value under HWE)
G_centered = G - 2 * af
if scale:
# Scale by sqrt(2*p*(1-p))
denom = np.sqrt(2 * af * (1 - af))
denom[denom == 0] = 1.0 # avoid divide by zero
G_centered = G_centered / denom
return G_centered
# --- Deprecated methods for backward compatibility ---
[docs]
def get_sample_indexer(self) -> "xft.index.SampleIndex":
"""
Create a SampleIndex from this haplotype array.
.. deprecated::
Use the `samples` attribute directly instead.
Returns
-------
xft.index.SampleIndex
Sample indexer with data from this array.
"""
warnings.warn(
"get_sample_indexer() is deprecated. Use the 'samples' attribute directly.",
DeprecationWarning,
stacklevel=2,
)
return xft.index.SampleIndex(
iid=self.samples.iid.astype(str),
fid=self.samples.fid.astype(str),
sex=self.samples.sex,
generation=self.generation,
)
[docs]
def get_variant_indexer(self) -> "xft.index.DiploidVariantIndex":
"""
Create a DiploidVariantIndex from this haplotype array.
.. deprecated::
Use the `variants` attribute directly instead.
Returns
-------
xft.index.DiploidVariantIndex
Variant indexer with data from this array.
"""
warnings.warn(
"get_variant_indexer() is deprecated. Use the 'variants' attribute directly.",
DeprecationWarning,
stacklevel=2,
)
return xft.index.DiploidVariantIndex(
vid=self.variants.vid,
chrom=self.variants.chrom,
pos_bp=self.variants.pos_bp,
pos_cM=self.variants.pos_cM,
af=self.variants.af if self.variants.af is not None else self.af_empirical,
zero_allele=self.variants.zero_allele,
one_allele=self.variants.one_allele,
)
# --- HaplotypeOperator ABC implementation ---
[docs]
def matvec_maternal(self, v: np.ndarray) -> np.ndarray:
"""Maternal haplotype matvec: hap[:,:,0] @ v."""
return self.genotypes[:, :, 0] @ v
[docs]
def matvec_paternal(self, v: np.ndarray) -> np.ndarray:
"""Paternal haplotype matvec: hap[:,:,1] @ v."""
return self.genotypes[:, :, 1] @ v
[docs]
def recompute_af(self) -> np.ndarray:
"""Recompute and return empirical allele frequencies."""
return self.af_empirical
[docs]
def to_dense(self) -> "DenseHaplotypeArray":
"""Return self (already dense)."""
return self
[docs]
def meiosis(self, assignment, recombination_map, rng=None) -> "DenseHaplotypeArray":
"""
Perform meiosis to produce offspring haplotypes.
Delegates to the existing numba-jitted _meiosis_3d kernel in reproduce.py.
Parameters
----------
assignment : MateAssignment
Mate assignment with maternal/paternal indices and offspring metadata.
recombination_map : RecombinationMap
Recombination probabilities between loci.
rng : numpy.random.RandomState, optional
Master RNG; forwarded to ``reproduce.meiosis`` for per-offspring
crossover seeding. ``None`` preserves the prior non-deterministic
behavior.
Returns
-------
DenseHaplotypeArray
Offspring haplotypes with inherited VariantMeta.
"""
from xftsim.reproduce import meiosis as _meiosis_fn
offspring_genotypes = _meiosis_fn(
self,
recombination_map,
assignment.maternal_idx,
assignment.paternal_idx,
rng=rng,
)
return DenseHaplotypeArray(
genotypes=offspring_genotypes,
generation=assignment.offspring_samples.generation,
samples=assignment.offspring_samples,
variants=self.variants,
)
@property
def xft(self) -> "HaplotypeArrayAccessor":
"""Return accessor object for compatibility with xarray .xft interface."""
return HaplotypeArrayAccessor(self)
def __repr__(self) -> str:
parts = [f"n={self.n}", f"m={self.m}", f"generation={self.generation}"]
if self.samples.n_fam != self.n:
parts.append(f"n_fam={self.n_fam}")
return f"DenseHaplotypeArray({', '.join(parts)})"
def _extract_variant_meta_from_grg(grg) -> VariantMeta:
"""Extract VariantMeta from a pygrgl GRG object.
Constructs variant IDs as "{position}:{ref}:{alt}".
Chromosome information is not available from GRG metadata.
"""
m = grg.num_mutations
positions = np.empty(m, dtype=np.int64)
ref_alleles = np.empty(m, dtype=object)
alt_alleles = np.empty(m, dtype=object)
vids = np.empty(m, dtype=object)
for i in range(m):
mut = grg.get_mutation_by_id(i)
positions[i] = int(mut.position)
ref_alleles[i] = str(mut.ref_allele)
alt_alleles[i] = str(mut.allele)
vids[i] = f"{int(mut.position)}:{mut.ref_allele}:{mut.allele}"
return VariantMeta(
vid=vids,
pos_bp=positions,
zero_allele=ref_alleles,
one_allele=alt_alleles,
)
[docs]
class GraphHaplotypeOperator(HaplotypeOperator):
"""GRG-backed haplotype operator using pygrgl graph traversal.
Provides O(nodes)-per-variant matvec without materializing the full
genotype matrix. After meiosis, offspring revert to DenseHaplotypeArray
since GRG has no native recombination support.
Parameters
----------
grg : pygrgl.GRG
Loaded GRG object (via ``pygrgl.load_immutable_grg``).
generation : int
Generation number (default 0).
samples : SampleMeta, optional
Sample metadata. If None, extracted from GRG individual IDs.
variants : VariantMeta, optional
Variant metadata. If None, extracted from GRG mutation data.
"""
def __init__(self, grg, generation: int = 0,
samples: Optional[SampleMeta] = None,
variants: Optional[VariantMeta] = None):
self._grg = grg
self._generation = generation
self._cached_af = None
# When meiosis mutates the GRG, pygrgl.matmul DOWN starts returning
# inflated values because of stale per-node "was-a-sample" flags
# (see meiosis() docstring). Saving + reloading the GRG regenerates
# those flags correctly. We defer the reload until the first
# DOWN-matmul call rather than running it at end-of-meiosis -- if
# no DOWN matmul ever runs, no reload cost is paid.
self._grg_dirty = False
# --- Samples ---
if samples is None:
n = grg.num_individuals
if grg.has_individual_ids:
iid = np.array([grg.get_individual_id(i) for i in range(n)])
else:
iid = np.arange(n, dtype=np.int64)
self.samples = SampleMeta(iid=iid, generation=generation)
else:
if samples.n != grg.num_individuals:
raise ValueError(
f"samples.n ({samples.n}) != grg.num_individuals ({grg.num_individuals})"
)
if samples.generation != generation:
self.samples = samples.with_generation(generation)
else:
self.samples = samples
# --- Variants ---
if variants is None:
self.variants = _extract_variant_meta_from_grg(grg)
else:
if variants.m != grg.num_mutations:
raise ValueError(
f"variants.m ({variants.m}) != grg.num_mutations ({grg.num_mutations})"
)
self.variants = variants
# --- properties ---
@property
def n(self) -> int:
return self._grg.num_individuals
@property
def m(self) -> int:
return self._grg.num_mutations
@property
def generation(self) -> int:
return self._generation
@generation.setter
def generation(self, value: int):
self._generation = value
def _ensure_fresh_grg(self):
"""Historical hook for a save+reload workaround that used to be needed
because pygrgl's matmul DOWN and save_grg both relied on
``getOrderedNodes`` returning a complete and correctly-ordered node
list, and the pre-patch implementation dropped both bubble nodes
(created via ``make_node()`` post-load) and offspring negative nodes
from that list. With the patched ``getOrderedNodes`` in grgl
(see grgl/include/grgl/grg.h ``MutableGRG::getOrderedNodes``),
matmul DOWN traverses the live graph correctly and no reload is
needed. This stub is kept so call sites in ``matvec`` /
``matvec_maternal`` / ``matvec_paternal`` / ``to_dense`` remain
unchanged; remove with the next API cleanup."""
return
# --- matrix-vector operations ---
[docs]
def matvec(self, v: np.ndarray) -> np.ndarray:
"""Diploid G @ v via GRG DOWN traversal (by_individual=True)."""
import pygrgl
self._ensure_fresh_grg()
v = np.asarray(v, dtype=np.float64)
inp = np.atleast_2d(v.T) # (K, m)
out = pygrgl.matmul(self._grg, inp, pygrgl.TraversalDirection.DOWN,
by_individual=True) # (K, n)
result = out.T # (n, K)
if v.ndim == 1:
return result.ravel()
return result
[docs]
def rmatvec(self, v: np.ndarray) -> np.ndarray:
"""G.T @ v via GRG UP traversal (by_individual=True)."""
import pygrgl
v = np.asarray(v, dtype=np.float64)
inp = np.atleast_2d(v.T) # (K, n)
out = pygrgl.matmul(self._grg, inp, pygrgl.TraversalDirection.UP,
by_individual=True) # (K, m)
result = out.T # (m, K)
if v.ndim == 1:
return result.ravel()
return result
[docs]
def matvec_maternal(self, v: np.ndarray) -> np.ndarray:
"""Maternal haplotype matvec via GRG DOWN haploid, even indices."""
import pygrgl
self._ensure_fresh_grg()
v = np.asarray(v, dtype=np.float64)
inp = np.atleast_2d(v.T) # (K, m)
out = pygrgl.matmul(self._grg, inp, pygrgl.TraversalDirection.DOWN,
by_individual=False) # (K, 2n)
maternal = out[:, 0::2] # even = maternal
result = maternal.T # (n, K)
if v.ndim == 1:
return result.ravel()
return result
[docs]
def matvec_paternal(self, v: np.ndarray) -> np.ndarray:
"""Paternal haplotype matvec via GRG DOWN haploid, odd indices."""
import pygrgl
self._ensure_fresh_grg()
v = np.asarray(v, dtype=np.float64)
inp = np.atleast_2d(v.T) # (K, m)
out = pygrgl.matmul(self._grg, inp, pygrgl.TraversalDirection.DOWN,
by_individual=False) # (K, 2n)
paternal = out[:, 1::2] # odd = paternal
result = paternal.T # (n, K)
if v.ndim == 1:
return result.ravel()
return result
[docs]
def standardized_matvec(self, v: np.ndarray, af: np.ndarray = None) -> np.ndarray:
"""Per-SNP standardized diploid matvec (no materialization).
Computes ((G - 2p) / sqrt(2pq)) @ v = (G - 2p) @ (v / sqrt(2pq)).
"""
v = np.asarray(v, dtype=np.float64)
if af is None:
af = self.recompute_af()
# Scale v by 1/sqrt(2pq) so that (G-2p) @ v_scaled = ((G-2p)/sqrt(2pq)) @ v
denom = np.sqrt(2.0 * af * (1.0 - af))
denom[denom == 0] = 1.0
if v.ndim == 1:
v_scaled = v / denom
else:
v_scaled = v / denom[:, np.newaxis]
raw = self.matvec(v_scaled)
correction = 2.0 * (af @ v_scaled)
return raw - correction
[docs]
def recompute_af(self) -> np.ndarray:
"""Compute allele frequencies via GRG UP traversal: G.T @ 1 / (2n)."""
import pygrgl
if self._cached_af is not None:
return self._cached_af
ones = np.ones((1, self.n), dtype=np.float64)
counts = pygrgl.matmul(self._grg, ones, pygrgl.TraversalDirection.UP,
by_individual=True) # (1, m)
self._cached_af = (counts.ravel() / (2.0 * self.n))
return self._cached_af
[docs]
def to_dense(self) -> DenseHaplotypeArray:
"""Materialize full genotype matrix via identity matvec."""
import pygrgl
self._ensure_fresh_grg()
eye = np.eye(self.m, dtype=np.float64) # (m, m)
# DOWN haploid: (m, m) -> (m, 2n)
haploid = pygrgl.matmul(self._grg, eye, pygrgl.TraversalDirection.DOWN,
by_individual=False) # (m, 2n)
# Reshape to (n, m, 2): maternal=even, paternal=odd
maternal = haploid[:, 0::2].T # (n, m)
paternal = haploid[:, 1::2].T # (n, m)
genotypes = np.stack([maternal, paternal], axis=2).astype(np.int8)
return DenseHaplotypeArray(
genotypes=genotypes,
generation=self._generation,
samples=self.samples,
variants=self.variants,
)
[docs]
def meiosis(self, assignment, recombination_map, rng=None) -> "GraphHaplotypeOperator":
"""Perform meiosis natively on the GRG via the bubble-insertion algorithm.
The underlying ``pygrgl.MutableGRG`` is mutated in place: offspring
sample nodes and bubble nodes are added, then ``set_samples`` demotes
the parent generation's samples to internal nodes. The returned
operator wraps the same GRG with the offspring ``SampleMeta``; the
original parent operator becomes stale.
Crossover positions are sampled per locus from
``recombination_map._probabilities`` (matching the dense
``_meiosis_3d`` kernel's distribution), then translated into bp-space
segments using ``self.variants.pos_bp``.
Parameters
----------
assignment : MateAssignment
Maternal/paternal indices into the parent generation and offspring
``SampleMeta``.
recombination_map : RecombinationMap
Per-locus recombination probabilities.
rng : numpy.random.RandomState, optional
Master RNG. Used to derive one independent seed per offspring
via ``SeedSequence.spawn``; each seed is applied via
``np.random.seed`` immediately before that offspring's two
``_meiosis_i`` phase draws, so crossover sampling is
deterministic given ``rng``'s state. Matches the seed-derivation
strategy used by the dense kernel — both paths consume one
``rng.randint`` draw before spawning.
Returns
-------
GraphHaplotypeOperator
Offspring operator wrapping the (mutated) same GRG.
"""
from xftsim.grg_recombination import (
NonDuplicationRecombination,
_phase_to_segments,
)
from xftsim.reproduce import _meiosis_pair_seeded, _spawn_meiosis_seeds
# Use GRG-internal mutation positions, not self.variants.pos_bp.
# The recombination algorithm filters mutations via mut.position, so
# segments must reference the same coordinate system. (xftsim's
# variants.pos_bp may be a sequential-index placeholder -- e.g.,
# founders.founder_haplotypes_from_msprime_grg sets it to
# np.arange(m) -- which would not align with the GRG's true
# bp-space mutations.)
pos_bp = np.fromiter(
(self._grg.get_mutation_by_id(i).position for i in range(self.m)),
dtype=np.int64,
count=self.m,
)
if pos_bp.size > 1 and np.any(np.diff(pos_bp) < 0):
raise ValueError(
"GRG-native meiosis requires mutation positions to be "
"monotonically non-decreasing along mutation-id order."
)
recomb_p = np.ascontiguousarray(recombination_map._probabilities, dtype=np.float64)
recomb = NonDuplicationRecombination(self._grg)
recomb.defer_sample_updates = True
# Convention: get_sample_nodes() returns 2*n_ind haploid IDs in
# individual-interleaved order -- samples[2i] = ind i maternal,
# samples[2i+1] = ind i paternal. This matches matvec_maternal /
# matvec_paternal above (out[:, 0::2] even = maternal).
parent_sample_nodes = list(self._grg.get_sample_nodes())
genome_length = int(self._grg.bp_range[1])
new_offspring_grg_ids = []
n_off = assignment.offspring_samples.n
maternal_idx = assignment.maternal_idx
paternal_idx = assignment.paternal_idx
# One seed per offspring; both mat and pat phase draws for offspring i
# come from the stream seeded by seeds[i]. Matches the dense kernel's
# convention exactly, so given the same `rng` and the same parent
# genotypes both paths sample the same phase vectors.
seeds = _spawn_meiosis_seeds(rng, n_off)
for i in range(n_off):
mat_i = int(maternal_idx[i])
pat_i = int(paternal_idx[i])
mat_haps = (parent_sample_nodes[2 * mat_i],
parent_sample_nodes[2 * mat_i + 1])
pat_haps = (parent_sample_nodes[2 * pat_i],
parent_sample_nodes[2 * pat_i + 1])
# Seed numba's RNG and draw both phase vectors inside one JIT
# call. A Python-level `np.random.seed()` does NOT seed numba's
# internal RNG state, so the seeding must live inside `_meiosis_pair_seeded`
# alongside the draws themselves. Returns (mat_phase, pat_phase),
# matching the dense kernel's seed-once-draw-twice pattern.
mat_phase, pat_phase = _meiosis_pair_seeded(recomb_p, seeds[i])
for parent_haps, phase in ((mat_haps, mat_phase),
(pat_haps, pat_phase)):
segments = _phase_to_segments(phase, pos_bp, parent_haps, genome_length)
neg_id = recomb.recombine_multi(segments)
raw_id = recomb.NEGATIVE_NODE_IDS[abs(neg_id) - 1]
new_offspring_grg_ids.append(raw_id)
# One wholesale sample swap (parents -> offspring), then compact.
new_offspring_grg_ids.sort()
self._grg.set_samples(new_offspring_grg_ids)
recomb._pending_sample_removals.clear()
self._grg.sort_mutations()
# The GRG is now in a state where pygrgl.matmul DOWN returns inflated
# values (every node that was a sample at load time still acts like
# one inside matmul DOWN's traversal, even after set_samples demotes
# it). UP-direction ops are unaffected. We flag the returned operator
# dirty; the first DOWN-matmul call will trigger a lazy save+reload
# that regenerates the per-node sample flags correctly.
offspring_op = GraphHaplotypeOperator(
grg=self._grg,
generation=assignment.offspring_samples.generation,
samples=assignment.offspring_samples,
variants=self.variants,
)
offspring_op._grg_dirty = True
return offspring_op
def __getitem__(self, key) -> DenseHaplotypeArray:
"""Materialize to dense and subset."""
return self.to_dense()[key]
def __repr__(self) -> str:
return (f"GraphHaplotypeOperator(n={self.n}, m={self.m}, "
f"generation={self.generation})")
[docs]
class StandardizedHaplotypeOperator(HaplotypeOperator):
"""Wraps a HaplotypeOperator so that ``matvec``/``rmatvec`` act on
the column-standardized matrix S = (X - mu) / sigma without
materializing S.
Identities used (mu, sigma broadcast across rows of X):
- ``S @ u = H.matvec(u / sigma) - <mu, u / sigma>``
- ``S.T @ v = (H.rmatvec(v) - mu * sum(v)) / sigma``
Defaults follow the HWE convention used elsewhere: ``mu = 2p``,
``sigma = sqrt(2 p (1 - p))`` where ``p`` comes from
``H.recompute_af()``. Loci with ``sigma == 0`` (monomorphic) keep
``sigma = 1`` to avoid division by zero, matching the existing
``standardized_matvec`` implementations.
All other ``HaplotypeOperator`` methods (``matvec_maternal``,
``matvec_paternal``, ``recompute_af``, ``to_dense``, ``meiosis``,
``__getitem__``) forward to the underlying operator and return
raw (un-standardized) results. Re-wrap explicitly if you need
standardized semantics on the result.
Parameters
----------
haplotypes : HaplotypeOperator
Underlying operator providing raw genotype matvec.
means : np.ndarray, optional
Per-variant means (length m). Defaults to ``2 * af``.
stds : np.ndarray, optional
Per-variant standard deviations (length m). Defaults to
``sqrt(2 * af * (1 - af))``. Zeros are replaced with 1.
"""
def __init__(
self,
haplotypes: HaplotypeOperator,
means: Optional[np.ndarray] = None,
stds: Optional[np.ndarray] = None,
):
self._H = haplotypes
if means is None or stds is None:
af = haplotypes.recompute_af()
if means is None:
means = 2.0 * af
if stds is None:
stds = np.sqrt(2.0 * af * (1.0 - af))
means = np.asarray(means, dtype=np.float64)
stds = np.asarray(stds, dtype=np.float64).copy()
if means.shape != (haplotypes.m,):
raise ValueError(
f"means shape {means.shape} != (m,) = ({haplotypes.m},)"
)
if stds.shape != (haplotypes.m,):
raise ValueError(
f"stds shape {stds.shape} != (m,) = ({haplotypes.m},)"
)
stds[stds == 0] = 1.0
self._means = means
self._stds = stds
# --- forwarded metadata ---
@property
def samples(self):
return self._H.samples
@property
def variants(self):
return self._H.variants
@property
def n(self) -> int:
return self._H.n
@property
def m(self) -> int:
return self._H.m
@property
def means(self) -> np.ndarray:
return self._means
@property
def stds(self) -> np.ndarray:
return self._stds
# --- standardized matrix-vector ops ---
[docs]
def matvec(self, v: np.ndarray) -> np.ndarray:
"""S @ v = H.matvec(v / sigma) - <mu, v / sigma>."""
v = np.asarray(v, dtype=np.float64)
if v.ndim == 1:
v_scaled = v / self._stds
raw = self._H.matvec(v_scaled)
return raw - self._means @ v_scaled
v_scaled = v / self._stds[:, np.newaxis]
raw = self._H.matvec(v_scaled)
correction = self._means @ v_scaled # shape (k,)
return raw - correction[np.newaxis, :]
[docs]
def rmatvec(self, v: np.ndarray) -> np.ndarray:
"""S.T @ v = (H.rmatvec(v) - mu * sum(v)) / sigma."""
v = np.asarray(v, dtype=np.float64)
raw = self._H.rmatvec(v)
if v.ndim == 1:
return (raw - self._means * v.sum()) / self._stds
vsum = v.sum(axis=0) # shape (k,)
return (raw - self._means[:, np.newaxis] * vsum[np.newaxis, :]) / self._stds[:, np.newaxis]
[docs]
def standardized_matvec(self, v: np.ndarray, af: np.ndarray = None) -> np.ndarray:
"""Already standardized: equivalent to ``matvec``.
The ``af`` argument is ignored; standardization parameters are
fixed at construction time.
"""
return self.matvec(v)
# --- forwarded ops (raw, un-standardized) ---
[docs]
def matvec_maternal(self, v: np.ndarray) -> np.ndarray:
return self._H.matvec_maternal(v)
[docs]
def matvec_paternal(self, v: np.ndarray) -> np.ndarray:
return self._H.matvec_paternal(v)
[docs]
def recompute_af(self) -> np.ndarray:
return self._H.recompute_af()
[docs]
def to_dense(self) -> "DenseHaplotypeArray":
return self._H.to_dense()
[docs]
def meiosis(self, assignment, recombination_map, rng=None) -> "HaplotypeOperator":
return self._H.meiosis(assignment, recombination_map, rng=rng)
def __getitem__(self, key) -> "HaplotypeOperator":
return self._H[key]
def __repr__(self) -> str:
return (f"StandardizedHaplotypeOperator(H={type(self._H).__name__}, "
f"n={self.n}, m={self.m})")
[docs]
class HaplotypeArrayAccessor:
"""
Accessor class that mimics the xarray .xft interface for DenseHaplotypeArray.
Provides compatibility with code expecting xarray-style access.
"""
def __init__(self, haplotypes: "DenseHaplotypeArray"):
self._haplotypes = haplotypes
@property
def n(self) -> int:
"""Number of samples."""
return self._haplotypes.n
@property
def m(self) -> int:
"""Number of variants."""
return self._haplotypes.m
@property
def generation(self) -> int:
"""Generation number."""
return self._haplotypes.generation
@property
def samples(self) -> SampleMeta:
"""Sample metadata."""
return self._haplotypes.samples
@property
def variants(self) -> VariantMeta:
"""Variant metadata."""
return self._haplotypes.variants
@property
def af_empirical(self) -> np.ndarray:
"""Empirical allele frequencies."""
return self._haplotypes.af_empirical
[docs]
def get_sample_indexer(self) -> "xft.index.SampleIndex":
"""Get sample indexer (deprecated)."""
return self._haplotypes.get_sample_indexer()
[docs]
def get_variant_indexer(self) -> "xft.index.DiploidVariantIndex":
"""Get variant indexer (deprecated)."""
return self._haplotypes.get_variant_indexer()
[docs]
def to_diploid(self) -> np.ndarray:
"""Return diploid genotype counts (0, 1, or 2) as 2D array (n, m)."""
return self._haplotypes.diploid_genotypes
[docs]
def to_diploid_standardized(self, af: np.ndarray = None, scale: bool = False) -> np.ndarray:
"""Return standardized diploid genotypes."""
return self._haplotypes.to_diploid_standardized(af=af, scale=scale)
# ---------------------------------------------------------------------------
# PhenotypeArray — new numpy-backed phenotype container
# ---------------------------------------------------------------------------
[docs]
class PhenotypeArray:
"""
Thin wrapper around a flat dict of named 1-D arrays.
Each key is a component/phenotype name (e.g. 'height.G', 'height').
The dot is purely a human convention — not parsed.
Parameters
----------
samples : SampleMeta
Sample metadata that travels with the data.
values : dict, optional
Initial name → (n,) array mapping.
"""
def __init__(self, samples: SampleMeta, values: Optional[Dict[str, np.ndarray]] = None):
self.samples = samples
self._values: Dict[str, np.ndarray] = {}
if values is not None:
for k, v in values.items():
self[k] = v
def __getitem__(self, key: str) -> np.ndarray:
return self._values[key]
def __setitem__(self, key: str, val: np.ndarray):
val = np.asarray(val, dtype=np.float64)
if val.shape != (self.samples.n,):
raise ValueError(
f"Value for '{key}' has shape {val.shape}, expected ({self.samples.n},)"
)
if key in self._values:
warnings.warn(f"Overwriting existing key '{key}' in PhenotypeArray")
self._values[key] = val
def __contains__(self, key: str) -> bool:
return key in self._values
@property
def keys(self):
"""Return the names of all stored components."""
return self._values.keys()
[docs]
def subset(self, idx) -> "PhenotypeArray":
"""Return a new PhenotypeArray with a subset of samples."""
new_samples = self.samples.subset(idx)
new_values = {k: v[idx].copy() for k, v in self._values.items()}
return PhenotypeArray(samples=new_samples, values=new_values)
def __repr__(self) -> str:
return (f"PhenotypeArray(n={self.samples.n}, "
f"keys={list(self._values.keys())})")
# ---------------------------------------------------------------------------
# PedigreeArray
# ---------------------------------------------------------------------------
[docs]
@dataclass
class PedigreeArray:
"""
Integer index arrays linking offspring to parents.
Produced at reproduction time; consumed by parent/mother/father references
and by filters (TrioFilter, SibPairFilter).
Parameters
----------
offspring_samples : SampleMeta
Metadata for the offspring generation.
maternal_idx : np.ndarray
(n,) indices into the *parent* generation's SampleMeta for each offspring's mother.
paternal_idx : np.ndarray
(n,) indices into the *parent* generation's SampleMeta for each offspring's father.
parent_n : int
Number of individuals in the parent generation (for bounds checking).
"""
offspring_samples: SampleMeta
maternal_idx: np.ndarray
paternal_idx: np.ndarray
parent_n: int
def __post_init__(self):
self.maternal_idx = np.asarray(self.maternal_idx, dtype=np.intp)
self.paternal_idx = np.asarray(self.paternal_idx, dtype=np.intp)
n = self.offspring_samples.n
if len(self.maternal_idx) != n:
raise ValueError(
f"maternal_idx length {len(self.maternal_idx)} != offspring n {n}"
)
if len(self.paternal_idx) != n:
raise ValueError(
f"paternal_idx length {len(self.paternal_idx)} != offspring n {n}"
)
if n > 0:
if np.any(self.maternal_idx < 0) or np.any(self.maternal_idx >= self.parent_n):
raise ValueError(
f"maternal_idx out of bounds [0, {self.parent_n})"
)
if np.any(self.paternal_idx < 0) or np.any(self.paternal_idx >= self.parent_n):
raise ValueError(
f"paternal_idx out of bounds [0, {self.parent_n})"
)