Source code for xftsim.reproduce

import warnings
import numpy as np
import pandas as pd
import dask.array as da
import nptyping as npt
import xarray as xr
from nptyping import NDArray, Int8, Int64, Float64, Bool, Shape
from typing import Any, Hashable, List, Iterable, Union
from functools import cached_property
import numba as nb
import math
import nptyping as npt

import xftsim as xft


[docs] class RecombinationMap: """A class to represent a diploid recombination map. Parameters ---------- p : float or numpy.ndarray, optional Probabilities, either a float or a numpy.ndarray, default is None. A single value results in an exchangeable map, an array corresponds to probabilities of recombination between specified loci. m : int, optional Number of variants. Required if p is a float. vid : numpy.ndarray, optional Variant IDs, default is None. chrom : numpy.ndarray, optional Chromosomes, default is None. pos_bp : numpy.ndarray, optional Per-variant base-pair positions, default is None. When provided, any locus j where ``pos_bp[j] == pos_bp[j-1]`` has its recombination probability forced to 0: crossover cannot occur between two mutations at the same physical site (e.g., multi-allelic sites where two ALT entries share a position, or msprime-discretized positions that collide). Without this guard, the GRG-native meiosis path's interval-based mutation filter (``bisect_left`` on positions) silently groups all same-position variants into whichever segment claims the position, producing wrong-hap assignments at the few duplicate-position cells. """ def __init__(self, p=None, m: int = None, vid: NDArray[Shape["*"], Any] = None, chrom: NDArray[Shape["*"], Int64] = None, pos_bp: NDArray[Shape["*"], Int64] = None, ): if m is not None: self.m = m elif vid is not None: self.m = len(vid) else: raise ValueError("Must provide m or vid") if vid is None: vid = np.arange(self.m) if chrom is None: chrom = np.zeros(self.m, dtype=np.int64) self.vid = vid self.chrom = chrom self.pos_bp = pos_bp # Chromosome boundaries (where recombination probability is 0.5) self._chrom_boundary = np.concatenate( [[0], 1 + np.where(chrom[1:] != chrom[:-1])[0]]) if isinstance(p, float): assert 0 <= p <= 1, "Provide a valid probability" self._probabilities = np.ones(self.m) * p elif isinstance(p, np.ndarray): assert p.shape[0] == self.m, "p and m must agree in length" self._probabilities = p.copy() else: # Default to 0.5 (free recombination) self._probabilities = np.ones(self.m) * 0.5 # Force chromosome boundaries to have 0.5 probability self._probabilities[self._chrom_boundary] = 0.5 # Force p=0 wherever consecutive variants share a bp position. # A crossover physically cannot land between two mutations at the # same site, and the GRG-native meiosis path cannot represent it # via interval-based segments (bisect_left collapses duplicates). # Forcing p=0 here makes the per-locus phase vector compatible # with the segment-based algorithm: same-position variants always # inherit from the same source hap. if pos_bp is not None: pos_bp_arr = np.asarray(pos_bp) assert pos_bp_arr.shape[0] == self.m, "pos_bp must have length m" same_pos_with_prev = np.concatenate( [[False], pos_bp_arr[1:] == pos_bp_arr[:-1]]) self._probabilities[same_pos_with_prev] = 0.0
[docs] @staticmethod def constant_map(m: int, p: float = 0.5, pos_bp: NDArray[Shape["*"], Int64] = None, ) -> "RecombinationMap": """ Create a constant recombination map. Parameters ---------- m : int Number of variants. p : float, optional Recombination probability, default is 0.5. pos_bp : numpy.ndarray, optional Per-variant base-pair positions; forwarded to ``RecombinationMap.__init__`` to suppress crossovers between same-position variants. See class docstring. Returns ------- RecombinationMap A constant recombination map. """ return RecombinationMap(p=p, m=m, pos_bp=pos_bp)
[docs] @staticmethod def from_haplotypes(haplotypes, p: float = 0.5) -> "RecombinationMap": """ Create a constant recombination map from haplotypes. Threads ``haplotypes.variants.pos_bp`` into the constructor when available so that same-position variants are properly handled (see class docstring). Falls back to None when the haplotype object lacks position metadata, which preserves the historical behavior. Parameters ---------- haplotypes : DenseHaplotypeArray or GraphHaplotypeOperator Haplotypes data. Any object with ``.m`` and ``.vid``; if it also exposes ``.variants.pos_bp``, that's used. p : float, optional Probability, default is 0.5. Returns ------- RecombinationMap A constant recombination map. """ pos_bp = None variants = getattr(haplotypes, "variants", None) if variants is not None: pos_bp = getattr(variants, "pos_bp", None) return RecombinationMap(p=p, m=haplotypes.m, vid=haplotypes.vid, pos_bp=pos_bp)
def __repr__(self): df = pd.DataFrame.from_dict(dict(vid=self.vid, chrom=self.chrom, p=self._probabilities)) return df.__repr__()
@nb.njit def _meiosis_pair_seeded(p, seed): """ Seed numba's RNG and draw (maternal, paternal) phase vectors. Exists because ``np.random.seed()`` at the Python level does NOT propagate to numba's internal RNG state — JIT'd functions like ``_meiosis_i`` read a separate thread-local numba RNG. To make Python callers (e.g. the GRG-native meiosis path) deterministic, the seed and both draws must happen inside a single JIT call. Matches the dense ``_meiosis_3d`` kernel's per-offspring behavior exactly: one seed, then two consecutive ``_meiosis_i`` draws against that just-seeded stream. So given the same per-offspring seed both the dense and GRG meiosis paths sample the same phase vectors, guaranteeing cross-path determinism on top of within-path determinism. No explicit numba signature — let JIT specialize on first call. """ np.random.seed(seed) mat = _meiosis_i(p) pat = _meiosis_i(p) return mat, pat @nb.njit("int64[:](float64[:])") def _meiosis_i(p): """ Maps recombination probabilities to haploid indices (0 or 1). Parameters ---------- p : numpy.ndarray[float64] An array of recombination probabilities. Returns ------- numpy.ndarray[int64] An array of haploid indices (0 or 1). """ m = p.shape[0] output = np.empty(m, dtype=np.int64) for j in range(m): output[j] = np.random.binomial(1, p[j]) output = np.cumsum(output) % 2 return output @nb.njit("int8[:,:,:](int8[:,:,:], int8[:,:,:], int64, int64, float64[:], int64[:], int64[:], uint32[:])", parallel=True) def _meiosis_3d(parental_genotypes, offspring_genotypes, n_offspring, m, recombination_p, maternal_inds, paternal_inds, seeds, ): """ Performs meiosis on 3D genotype arrays. Parameters ---------- parental_genotypes : numpy.ndarray[int8] 3D array of parental genotypes (n_parents, m, 2). offspring_genotypes : numpy.ndarray[int8] 3D array to store offspring genotypes (n_offspring, m, 2). n_offspring : int64 The number of offspring. m : int64 The number of variants. recombination_p : numpy.ndarray[float64] An array of recombination probabilities. maternal_inds : numpy.ndarray[int64] An array of maternal parent indices. paternal_inds : numpy.ndarray[int64] An array of paternal parent indices. seeds : numpy.ndarray[uint32] Per-offspring RNG seeds. ``seeds[i]`` is applied to numba's thread-local RNG before drawing offspring i's maternal/paternal phase vectors, so the resulting genotypes are deterministic given the seed array — independent of how ``nb.prange`` distributes offspring across threads. Length must equal ``n_offspring``. Returns ------- numpy.ndarray[int8] 3D array of offspring genotypes. """ for i in nb.prange(n_offspring): # Re-seed the current thread's RNG with this offspring's seed. # Numba's `np.random.seed()` is per-thread inside `prange`, so this # localizes randomness to the offspring rather than the thread — # which is what makes the output independent of scheduling. np.random.seed(seeds[i]) # Maternal meiosis: select which haplotype (0 or 1) at each locus mat_hap_select = _meiosis_i(recombination_p) # Paternal meiosis: select which haplotype (0 or 1) at each locus pat_hap_select = _meiosis_i(recombination_p) mat_idx = maternal_inds[i] pat_idx = paternal_inds[i] for j in range(m): # Offspring's first haplotype comes from mother offspring_genotypes[i, j, 0] = parental_genotypes[mat_idx, j, mat_hap_select[j]] # Offspring's second haplotype comes from father offspring_genotypes[i, j, 1] = parental_genotypes[pat_idx, j, pat_hap_select[j]] return offspring_genotypes def _spawn_meiosis_seeds(rng: np.random.RandomState, n: int) -> np.ndarray: """Derive ``n`` independent uint32 seeds from ``rng`` via SeedSequence. Centralized so the dense and GRG meiosis paths agree on how seeds are derived from the simulation's master RNG. Both consume exactly one ``rng.randint`` draw before delegating to ``SeedSequence.spawn`` — keeping the master rng state advance the same regardless of which path runs. """ if rng is None: rng = np.random.RandomState() entropy = int(rng.randint(0, 2 ** 31 - 1)) ss = np.random.SeedSequence(entropy) children = ss.spawn(n) return np.fromiter( (c.generate_state(1)[0] for c in children), dtype=np.uint32, count=n, )
[docs] def meiosis(parental_haplotypes: xft.struct.DenseHaplotypeArray, recombination_map: RecombinationMap, maternal_inds: NDArray[Shape["*"], Int64], paternal_inds: NDArray[Shape["*"], Int64], rng: np.random.RandomState = None, ) -> NDArray[Shape["*, *, *"], Int8]: """ Performs meiosis on parental haplotypes. Parameters ---------- parental_haplotypes : DenseHaplotypeArray Parental haplotype data. recombination_map : RecombinationMap Recombination probabilities. maternal_inds : numpy.ndarray[int64] An array of maternal parent indices. paternal_inds : numpy.ndarray[int64] An array of paternal parent indices. rng : numpy.random.RandomState, optional Master RNG. Used to derive per-offspring seeds via ``SeedSequence.spawn`` so that crossover sampling is deterministic given the master state (independent of how ``nb.prange`` schedules offspring across threads). If ``None`` (the default), a fresh ``RandomState`` is constructed, preserving the historical non-deterministic behavior for callers that don't pass an rng. Returns ------- numpy.ndarray[int8] 3D array of offspring genotypes (n_offspring, m, 2). """ parental_genotypes = parental_haplotypes.genotypes m = parental_haplotypes.m recombination_p = recombination_map._probabilities assert parental_genotypes.shape[1] == m, "incompatible dimensions" assert paternal_inds.shape[0] == maternal_inds.shape[0], "incompatible dimensions" assert np.max(maternal_inds) < parental_genotypes.shape[0], "maternal index out of bounds" assert np.max(paternal_inds) < parental_genotypes.shape[0], "paternal index out of bounds" assert parental_genotypes.dtype == np.int8, "genotypes must be int8" assert recombination_p.dtype == np.float64, "recombination_p must be float64" assert maternal_inds.dtype == paternal_inds.dtype == np.int64, "indices must be int64" n_offspring = maternal_inds.shape[0] offspring_genotypes = np.empty((n_offspring, m, 2), dtype=np.int8) seeds = _spawn_meiosis_seeds(rng, n_offspring) return _meiosis_3d(parental_genotypes, offspring_genotypes, n_offspring, m, recombination_p, maternal_inds, paternal_inds, seeds)