Source code for xftsim.founders

from __future__ import annotations

import warnings
import numpy as np
import pandas as pd
import nptyping as npt
import xarray as xr
from nptyping import NDArray, Int8, Int64, Float64, Bool, Shape, Float, Int
from typing import Any, Hashable, List, Iterable, Callable, Union, Dict
from functools import cached_property
import msprime
import tskit
import pygrgl
import stdpopsim
import tempfile
import subprocess
from shutil import which
import os

import xftsim as xft


[docs] def founder_haplotypes_from_AFs(n: int, afs: Iterable, diploid: bool = True, generation: int = 0) -> xft.struct.DenseHaplotypeArray: """ Generate founder haplotypes from specified allele frequencies. Parameters ---------- n : int Number of individuals to simulate. afs : Iterable Allele frequencies as an iterable of floats (one per variant). diploid : bool, optional Flag indicating if the generated haplotypes should be diploid or haploid. generation : int, optional Generation number for the founders. Default is 0. Returns ------- xft.struct.DenseHaplotypeArray An object representing a set of haplotypes generated from the given allele frequencies. """ afs = np.asarray(afs).ravel() m = len(afs) # Generate 3D genotypes array (n, m, 2) using fully vectorized sampling. # Each haplotype copy is sampled independently from Bernoulli(af). # Broadcasting af over the sample dimension avoids a Python-level loop. af_row = afs[np.newaxis, :] # shape (1, m) genotypes = np.empty((n, m, 2), dtype=np.int8) genotypes[:, :, 0] = (np.random.random((n, m)) < af_row).astype(np.int8) genotypes[:, :, 1] = (np.random.random((n, m)) < af_row).astype(np.int8) # Create sample metadata iid = np.arange(n, dtype=np.int64) samples = xft.struct.SampleMeta(iid=iid) # Create variant metadata with allele frequencies vid = np.arange(m, dtype=np.int64) variants = xft.struct.VariantMeta(vid=vid, af=afs) return xft.struct.DenseHaplotypeArray( genotypes=genotypes, generation=generation, samples=samples, variants=variants, )
[docs] def founder_haplotypes_uniform_AFs(n: int, m: int, minMAF: float = .1, generation: int = 0) -> xft.struct.DenseHaplotypeArray: """ Generate founder haplotypes from uniform-distributed allele frequencies. Parameters ---------- n : int Number of individuals to simulate. m : int Number of variants. minMAF : float, optional Minimum minor allele frequency for generated haplotypes. generation : int, optional Generation number for the founders. Default is 0. Returns ------- xft.struct.DenseHaplotypeArray An object representing a set of haplotypes generated with uniform allele frequencies. """ afs = np.random.uniform(minMAF, 1 - minMAF, m) return founder_haplotypes_from_AFs(n=n, afs=afs, diploid=True, generation=generation)
[docs] def founder_haplotypes_from_sgkit_dataset(gdat: xr.Dataset, generation: int = 0) -> xft.struct.DenseHaplotypeArray: """Construct founder haplotypes array from sgkit DataArray. Useful in conjuction with sgkit.io.vcf.vcf_to_zarr() and sgkit.load_dataset() Parameters ---------- gdat : xr.Dataset Dataset generated by sgkit.load_dataset() generation : int, optional Generation number for the founders. Default is 0. Returns ------- xft.struct.DenseHaplotypeArray Array of founder haplotypes. """ return xft.io.haplotypes_from_sgkit_dataset(gdat, generation=generation)
[docs] def founder_haplotypes_from_msprime_grg( n: int, sequence_length: int, Ne: float = 10000, recombination_rate: float = 1e-8, mutation_rate: float = 1e-8, generation: int = 0, *, binary_muts: bool = False, use_node_times: bool = False, no_simplify: bool = False, maintain_topo: bool = False, ts_coals: bool = False, ) -> xft.struct.GraphHaplotypeOperator: """ Generate founder haplotypes using msprime and return them as a GraphHaplotypeOperator. This function simulates ancestry and mutations for a population of size `n` over a sequence of length `sequence_length`, then converts the resulting TreeSequence into a Genotype Representation Graph (GRG) via the grgl CLI. Parameters ---------- n : int Number of diploid individuals to simulate. sequence_length : int The length of the genomic region to simulate (in base pairs). Ne : float, optional Effective population size. Default is 10000. recombination_rate : float, optional Recombination rate per base pair per generation. Default is 1e-8. mutation_rate : float, optional Mutation rate per base pair per generation. Default is 1e-8. generation : int, optional Generation number for the founders. Default is 0. binary_muts : bool, optional Flag to pass --binary-muts to grgl. use_node_times : bool, optional Flag to pass --ts-node-times to grgl. no_simplify : bool, optional Flag to pass --no-simplify to grgl. maintain_topo : bool, optional Flag to pass --maintain-topo to grgl. ts_coals : bool, optional Flag to pass --ts-coals to grgl to calculate diploid coalescence information. Returns ------- xft.struct.GraphHaplotypeOperator The operator containing the simulated founder graph and metadata. """ import msprime import pygrgl # Step 2: Simulate Ancestry and Mutations ts = msprime.sim_ancestry( samples=n, sequence_length=sequence_length, population_size=Ne, recombination_rate=recombination_rate, ) ts = msprime.sim_mutations(ts, rate=mutation_rate) # Step 3: Convert TreeSequence to GRG (no VCF middleman) with tempfile.TemporaryDirectory() as tmpdir: trees_path = os.path.join(tmpdir, "founders.trees") grg_path = os.path.join(tmpdir, "founders.grg") # Dump the msprime TreeSequence to a .trees file ts.dump(trees_path) # Find the GRGL binary ("grgl") installed with pygrgl grg_bin = which("grg") if grg_bin is None: raise RuntimeError( "Could not find 'grg' on PATH. " "Make sure pygrgl is installed and the environment is activated." ) # Build the same command that `grg convert <trees> <grg>` uses cmd = [grg_bin, "convert", trees_path, grg_path] if binary_muts: cmd.append("--binary-muts") if use_node_times: cmd.append("--ts-node-times") if no_simplify: cmd.append("--no-simplify") if maintain_topo: cmd.append("--maintain-topo") if ts_coals: cmd.append("--ts-coals") # Run GRGL to create the GRG file subprocess.check_call(cmd) # Load the GRG back into Python (mutable so GRG-native meiosis works) grg = pygrgl.load_mutable_grg(grg_path) # Step 4: Extract and Build Metadata iids = np.array([f"ind_{i}" for i in range(ts.num_individuals)], dtype=str) samples = xft.struct.SampleMeta(iid=iids, generation=generation) # Align variants to GRG mutations (placeholder implementation) num_grg_muts = grg.num_mutations # this is an int attribute # You can keep vid as ints: vids = np.arange(num_grg_muts, dtype=int) chroms = np.repeat("1", num_grg_muts) pos_bp = np.arange(num_grg_muts, dtype=int) pos_cM = pos_bp * recombination_rate * 100.0 zero_allele = np.repeat("0", num_grg_muts) one_allele = np.repeat("1", num_grg_muts) variants = xft.struct.VariantMeta( vid=vids, chrom=chroms, pos_bp=pos_bp, pos_cM=pos_cM, zero_allele=zero_allele, one_allele=one_allele, ) # Step 5: Instantiate and Return return xft.struct.GraphHaplotypeOperator( grg=grg, generation=generation, samples=samples, variants=variants )
[docs] def founder_haplotypes_from_stdpopsim_grg( samples: Dict[str, int], model_id: str = "OutOfAfrica_3G09", chromosome: str = "chr22", species_id: str = "HomSap", genetic_map: Union[str, None] = None, left: Union[int, None] = None, right: Union[int, None] = None, mutation_rate: Union[float, None] = None, engine_name: str = "msprime", generation: int = 0, *, binary_muts: bool = False, use_node_times: bool = False, no_simplify: bool = False, maintain_topo: bool = False, ts_coals: bool = False, ) -> xft.struct.GraphHaplotypeOperator: """ Generate founder haplotypes from a stdpopsim demographic model and return them as a GraphHaplotypeOperator. Simulates a TreeSequence using a published stdpopsim demographic model (e.g. ``HomSap`` / ``OutOfAfrica_3G09``) and converts the result to a Genotype Representation Graph (GRG) via the grgl CLI. Parameters ---------- samples : Dict[str, int] Mapping of stdpopsim population name to number of diploid individuals to draw from that population (e.g. ``{"YRI": 100, "CEU": 100, "CHB": 100}``). The available population names depend on the chosen demographic model. model_id : str, optional Identifier of the stdpopsim demographic model. Default is ``"OutOfAfrica_3G09"``. chromosome : str, optional Chromosome identifier passed to ``species.get_contig``. Default is ``"chr22"``. species_id : str, optional Species identifier used by stdpopsim. Default is ``"HomSap"`` (Homo sapiens). genetic_map : str or None, optional Optional stdpopsim genetic map identifier (e.g. ``"HapMapII_GRCh38"``). If ``None``, the contig uses a uniform recombination rate. left : int or None, optional Left coordinate (in base pairs, 0-based inclusive) of a sub-region of the chromosome to simulate. If ``None``, simulation starts at position 0. Use together with ``right`` to shorten simulations for faster tests. right : int or None, optional Right coordinate (in base pairs, exclusive) of a sub-region of the chromosome to simulate. If ``None``, simulation runs to the end of the contig. mutation_rate : float or None, optional Override for the contig's mutation rate. If ``None``, defaults to the demographic model's calibrated mutation rate when one is published (``model.mutation_rate``); otherwise stdpopsim's species/contig default is used. engine_name : str, optional stdpopsim simulation engine to use. Default ``"msprime"``. generation : int, optional Generation number for the founders. Default is 0. binary_muts : bool, optional Flag to pass --binary-muts to grgl. use_node_times : bool, optional Flag to pass --ts-node-times to grgl. no_simplify : bool, optional Flag to pass --no-simplify to grgl. maintain_topo : bool, optional Flag to pass --maintain-topo to grgl. ts_coals : bool, optional Flag to pass --ts-coals to grgl to calculate diploid coalescence information. Returns ------- xft.struct.GraphHaplotypeOperator The operator containing the simulated founder graph and metadata. Notes ----- Sample IIDs are prefixed with the stdpopsim population name (e.g. ``"YRI_0"``, ``"YRI_1"``, ...). The full per-individual population label is also stored on ``samples.extra["population"]`` so it can be used as a grouping variable by :class:`xftsim.arch.GroupingComponent`. Variant positions and alleles are read directly from the GRG; ``pos_cM`` is computed by integrating the contig's recombination map. ``vid`` is formatted as ``"{chromosome}:{pos_bp}:{ref}:{alt}"``. """ # Step 1: Build the stdpopsim simulation specification species = stdpopsim.get_species(species_id) model = species.get_demographic_model(model_id) if mutation_rate is None: mutation_rate = getattr(model, "mutation_rate", None) contig_kwargs = {} if genetic_map is not None: contig_kwargs["genetic_map"] = genetic_map if mutation_rate is not None: contig_kwargs["mutation_rate"] = mutation_rate if left is not None: contig_kwargs["left"] = left if right is not None: contig_kwargs["right"] = right contig = species.get_contig(chromosome, **contig_kwargs) # Step 2: Simulate a TreeSequence with the chosen engine engine = stdpopsim.get_engine(engine_name) ts = engine.simulate(model, contig, samples) # Step 3: Convert TreeSequence to GRG (no VCF middleman) with tempfile.TemporaryDirectory() as tmpdir: trees_path = os.path.join(tmpdir, "founders.trees") grg_path = os.path.join(tmpdir, "founders.grg") ts.dump(trees_path) grg_bin = which("grg") if grg_bin is None: raise RuntimeError( "Could not find 'grg' on PATH. " "Make sure pygrgl is installed and the environment is activated." ) cmd = [grg_bin, "convert", trees_path, grg_path] if binary_muts: cmd.append("--binary-muts") if use_node_times: cmd.append("--ts-node-times") if no_simplify: cmd.append("--no-simplify") if maintain_topo: cmd.append("--maintain-topo") if ts_coals: cmd.append("--ts-coals") subprocess.check_call(cmd) grg = pygrgl.load_mutable_grg(grg_path) # Step 4: Build sample metadata (population labels + prefixed iids) pop_names = np.array( [ts.population(ind.population).metadata["name"] for ind in ts.individuals()], dtype=object, ) iids = np.empty(ts.num_individuals, dtype=object) pop_counter: Dict[str, int] = {} for i, pop in enumerate(pop_names): j = pop_counter.get(pop, 0) iids[i] = f"{pop}_{j}" pop_counter[pop] = j + 1 iids = iids.astype(str) samples_meta = xft.struct.SampleMeta( iid=iids, generation=generation, extra={"population": pop_names}, ) # Step 5: Build variant metadata (real positions, alleles, cumulative cM) num_grg_muts = grg.num_mutations pos_bp = np.empty(num_grg_muts, dtype=np.int64) ref_alleles = np.empty(num_grg_muts, dtype=object) alt_alleles = np.empty(num_grg_muts, dtype=object) for i in range(num_grg_muts): mut = grg.get_mutation_by_id(i) pos_bp[i] = int(mut.position) ref_alleles[i] = str(mut.ref_allele) alt_alleles[i] = str(mut.allele) chroms = np.repeat(str(chromosome), num_grg_muts) vids = np.array( [f"{chromosome}:{p}:{r}:{a}" for p, r, a in zip(pos_bp, ref_alleles, alt_alleles)], dtype=str, ) pos_cM = np.asarray( contig.recombination_map.get_cumulative_mass(pos_bp), dtype=np.float64, ) * 100.0 variants = xft.struct.VariantMeta( vid=vids, chrom=chroms, pos_bp=pos_bp, pos_cM=pos_cM, zero_allele=ref_alleles, one_allele=alt_alleles, ) # Step 6: Instantiate and Return return xft.struct.GraphHaplotypeOperator( grg=grg, generation=generation, samples=samples_meta, variants=variants, )