from __future__ import annotations
import warnings
import numpy as np
import pandas as pd
import nptyping as npt
import xarray as xr
from nptyping import NDArray, Int8, Int64, Float64, Bool, Shape, Float, Int
from typing import Any, Hashable, List, Iterable, Callable, Union, Dict
from functools import cached_property
import msprime
import tskit
import pygrgl
import stdpopsim
import tempfile
import subprocess
from shutil import which
import os
import xftsim as xft
[docs]
def founder_haplotypes_from_AFs(n: int,
afs: Iterable,
diploid: bool = True,
generation: int = 0) -> xft.struct.DenseHaplotypeArray:
"""
Generate founder haplotypes from specified allele frequencies.
Parameters
----------
n : int
Number of individuals to simulate.
afs : Iterable
Allele frequencies as an iterable of floats (one per variant).
diploid : bool, optional
Flag indicating if the generated haplotypes should be diploid or haploid.
generation : int, optional
Generation number for the founders. Default is 0.
Returns
-------
xft.struct.DenseHaplotypeArray
An object representing a set of haplotypes generated from the given allele frequencies.
"""
afs = np.asarray(afs).ravel()
m = len(afs)
# Generate 3D genotypes array (n, m, 2) using fully vectorized sampling.
# Each haplotype copy is sampled independently from Bernoulli(af).
# Broadcasting af over the sample dimension avoids a Python-level loop.
af_row = afs[np.newaxis, :] # shape (1, m)
genotypes = np.empty((n, m, 2), dtype=np.int8)
genotypes[:, :, 0] = (np.random.random((n, m)) < af_row).astype(np.int8)
genotypes[:, :, 1] = (np.random.random((n, m)) < af_row).astype(np.int8)
# Create sample metadata
iid = np.arange(n, dtype=np.int64)
samples = xft.struct.SampleMeta(iid=iid)
# Create variant metadata with allele frequencies
vid = np.arange(m, dtype=np.int64)
variants = xft.struct.VariantMeta(vid=vid, af=afs)
return xft.struct.DenseHaplotypeArray(
genotypes=genotypes,
generation=generation,
samples=samples,
variants=variants,
)
[docs]
def founder_haplotypes_from_sgkit_dataset(gdat: xr.Dataset,
generation: int = 0) -> xft.struct.DenseHaplotypeArray:
"""Construct founder haplotypes array from sgkit DataArray.
Useful in conjuction with sgkit.io.vcf.vcf_to_zarr() and sgkit.load_dataset()
Parameters
----------
gdat : xr.Dataset
Dataset generated by sgkit.load_dataset()
generation : int, optional
Generation number for the founders. Default is 0.
Returns
-------
xft.struct.DenseHaplotypeArray
Array of founder haplotypes.
"""
return xft.io.haplotypes_from_sgkit_dataset(gdat, generation=generation)
[docs]
def founder_haplotypes_from_plink_bfile(path: str,
generation: int = 0) -> xft.struct.DenseHaplotypeArray:
"""
Reads in PLINK 1 binary genotype data and returns a DenseHaplotypeArray object containing pseudo-haplotypes by
randomly assigning haplotypes at heterozygous sites.
Parameters
----------
path : str
The file path to the PLINK 1 binary genotype data.
generation : int, optional
Generation number for the founders. Default is 0.
Returns
-------
xft.struct.DenseHaplotypeArray
Founder Pseudo-haplotype array. The "pseudo-" prefix refers to the fact that the
plink bfile format doesn't track phase.
"""
return xft.io.read_plink1_as_pseudohaplotypes(path, generation=generation)
[docs]
def founder_haplotypes_from_msprime_grg(
n: int,
sequence_length: int,
Ne: float = 10000,
recombination_rate: float = 1e-8,
mutation_rate: float = 1e-8,
generation: int = 0,
*,
binary_muts: bool = False,
use_node_times: bool = False,
no_simplify: bool = False,
maintain_topo: bool = False,
ts_coals: bool = False,
) -> xft.struct.GraphHaplotypeOperator:
"""
Generate founder haplotypes using msprime and return them as a GraphHaplotypeOperator.
This function simulates ancestry and mutations for a population of size `n`
over a sequence of length `sequence_length`, then converts the resulting
TreeSequence into a Genotype Representation Graph (GRG) via the grgl CLI.
Parameters
----------
n : int
Number of diploid individuals to simulate.
sequence_length : int
The length of the genomic region to simulate (in base pairs).
Ne : float, optional
Effective population size. Default is 10000.
recombination_rate : float, optional
Recombination rate per base pair per generation. Default is 1e-8.
mutation_rate : float, optional
Mutation rate per base pair per generation. Default is 1e-8.
generation : int, optional
Generation number for the founders. Default is 0.
binary_muts : bool, optional
Flag to pass --binary-muts to grgl.
use_node_times : bool, optional
Flag to pass --ts-node-times to grgl.
no_simplify : bool, optional
Flag to pass --no-simplify to grgl.
maintain_topo : bool, optional
Flag to pass --maintain-topo to grgl.
ts_coals : bool, optional
Flag to pass --ts-coals to grgl to calculate diploid coalescence information.
Returns
-------
xft.struct.GraphHaplotypeOperator
The operator containing the simulated founder graph and metadata.
"""
import msprime
import pygrgl
# Step 2: Simulate Ancestry and Mutations
ts = msprime.sim_ancestry(
samples=n,
sequence_length=sequence_length,
population_size=Ne,
recombination_rate=recombination_rate,
)
ts = msprime.sim_mutations(ts, rate=mutation_rate)
# Step 3: Convert TreeSequence to GRG (no VCF middleman)
with tempfile.TemporaryDirectory() as tmpdir:
trees_path = os.path.join(tmpdir, "founders.trees")
grg_path = os.path.join(tmpdir, "founders.grg")
# Dump the msprime TreeSequence to a .trees file
ts.dump(trees_path)
# Find the GRGL binary ("grgl") installed with pygrgl
grg_bin = which("grg")
if grg_bin is None:
raise RuntimeError(
"Could not find 'grg' on PATH. "
"Make sure pygrgl is installed and the environment is activated."
)
# Build the same command that `grg convert <trees> <grg>` uses
cmd = [grg_bin, "convert", trees_path, grg_path]
if binary_muts:
cmd.append("--binary-muts")
if use_node_times:
cmd.append("--ts-node-times")
if no_simplify:
cmd.append("--no-simplify")
if maintain_topo:
cmd.append("--maintain-topo")
if ts_coals:
cmd.append("--ts-coals")
# Run GRGL to create the GRG file
subprocess.check_call(cmd)
# Load the GRG back into Python (mutable so GRG-native meiosis works)
grg = pygrgl.load_mutable_grg(grg_path)
# Step 4: Extract and Build Metadata
iids = np.array([f"ind_{i}" for i in range(ts.num_individuals)], dtype=str)
samples = xft.struct.SampleMeta(iid=iids, generation=generation)
# Align variants to GRG mutations (placeholder implementation)
num_grg_muts = grg.num_mutations # this is an int attribute
# You can keep vid as ints:
vids = np.arange(num_grg_muts, dtype=int)
chroms = np.repeat("1", num_grg_muts)
pos_bp = np.arange(num_grg_muts, dtype=int)
pos_cM = pos_bp * recombination_rate * 100.0
zero_allele = np.repeat("0", num_grg_muts)
one_allele = np.repeat("1", num_grg_muts)
variants = xft.struct.VariantMeta(
vid=vids,
chrom=chroms,
pos_bp=pos_bp,
pos_cM=pos_cM,
zero_allele=zero_allele,
one_allele=one_allele,
)
# Step 5: Instantiate and Return
return xft.struct.GraphHaplotypeOperator(
grg=grg, generation=generation, samples=samples, variants=variants
)
[docs]
def founder_haplotypes_from_stdpopsim_grg(
samples: Dict[str, int],
model_id: str = "OutOfAfrica_3G09",
chromosome: str = "chr22",
species_id: str = "HomSap",
genetic_map: Union[str, None] = None,
left: Union[int, None] = None,
right: Union[int, None] = None,
mutation_rate: Union[float, None] = None,
engine_name: str = "msprime",
generation: int = 0,
*,
binary_muts: bool = False,
use_node_times: bool = False,
no_simplify: bool = False,
maintain_topo: bool = False,
ts_coals: bool = False,
) -> xft.struct.GraphHaplotypeOperator:
"""
Generate founder haplotypes from a stdpopsim demographic model and return them
as a GraphHaplotypeOperator.
Simulates a TreeSequence using a published stdpopsim demographic model
(e.g. ``HomSap`` / ``OutOfAfrica_3G09``) and converts the result to a Genotype
Representation Graph (GRG) via the grgl CLI.
Parameters
----------
samples : Dict[str, int]
Mapping of stdpopsim population name to number of diploid individuals
to draw from that population (e.g. ``{"YRI": 100, "CEU": 100, "CHB": 100}``).
The available population names depend on the chosen demographic model.
model_id : str, optional
Identifier of the stdpopsim demographic model. Default is ``"OutOfAfrica_3G09"``.
chromosome : str, optional
Chromosome identifier passed to ``species.get_contig``. Default is ``"chr22"``.
species_id : str, optional
Species identifier used by stdpopsim. Default is ``"HomSap"`` (Homo sapiens).
genetic_map : str or None, optional
Optional stdpopsim genetic map identifier (e.g. ``"HapMapII_GRCh38"``).
If ``None``, the contig uses a uniform recombination rate.
left : int or None, optional
Left coordinate (in base pairs, 0-based inclusive) of a sub-region of the
chromosome to simulate. If ``None``, simulation starts at position 0.
Use together with ``right`` to shorten simulations for faster tests.
right : int or None, optional
Right coordinate (in base pairs, exclusive) of a sub-region of the
chromosome to simulate. If ``None``, simulation runs to the end of the
contig.
mutation_rate : float or None, optional
Override for the contig's mutation rate. If ``None``, defaults to the
demographic model's calibrated mutation rate when one is published
(``model.mutation_rate``); otherwise stdpopsim's species/contig default
is used.
engine_name : str, optional
stdpopsim simulation engine to use. Default ``"msprime"``.
generation : int, optional
Generation number for the founders. Default is 0.
binary_muts : bool, optional
Flag to pass --binary-muts to grgl.
use_node_times : bool, optional
Flag to pass --ts-node-times to grgl.
no_simplify : bool, optional
Flag to pass --no-simplify to grgl.
maintain_topo : bool, optional
Flag to pass --maintain-topo to grgl.
ts_coals : bool, optional
Flag to pass --ts-coals to grgl to calculate diploid coalescence information.
Returns
-------
xft.struct.GraphHaplotypeOperator
The operator containing the simulated founder graph and metadata.
Notes
-----
Sample IIDs are prefixed with the stdpopsim population name (e.g.
``"YRI_0"``, ``"YRI_1"``, ...). The full per-individual population label
is also stored on ``samples.extra["population"]`` so it can be used as a
grouping variable by :class:`xftsim.arch.GroupingComponent`.
Variant positions and alleles are read directly from the GRG; ``pos_cM``
is computed by integrating the contig's recombination map. ``vid`` is
formatted as ``"{chromosome}:{pos_bp}:{ref}:{alt}"``.
"""
# Step 1: Build the stdpopsim simulation specification
species = stdpopsim.get_species(species_id)
model = species.get_demographic_model(model_id)
if mutation_rate is None:
mutation_rate = getattr(model, "mutation_rate", None)
contig_kwargs = {}
if genetic_map is not None:
contig_kwargs["genetic_map"] = genetic_map
if mutation_rate is not None:
contig_kwargs["mutation_rate"] = mutation_rate
if left is not None:
contig_kwargs["left"] = left
if right is not None:
contig_kwargs["right"] = right
contig = species.get_contig(chromosome, **contig_kwargs)
# Step 2: Simulate a TreeSequence with the chosen engine
engine = stdpopsim.get_engine(engine_name)
ts = engine.simulate(model, contig, samples)
# Step 3: Convert TreeSequence to GRG (no VCF middleman)
with tempfile.TemporaryDirectory() as tmpdir:
trees_path = os.path.join(tmpdir, "founders.trees")
grg_path = os.path.join(tmpdir, "founders.grg")
ts.dump(trees_path)
grg_bin = which("grg")
if grg_bin is None:
raise RuntimeError(
"Could not find 'grg' on PATH. "
"Make sure pygrgl is installed and the environment is activated."
)
cmd = [grg_bin, "convert", trees_path, grg_path]
if binary_muts:
cmd.append("--binary-muts")
if use_node_times:
cmd.append("--ts-node-times")
if no_simplify:
cmd.append("--no-simplify")
if maintain_topo:
cmd.append("--maintain-topo")
if ts_coals:
cmd.append("--ts-coals")
subprocess.check_call(cmd)
grg = pygrgl.load_mutable_grg(grg_path)
# Step 4: Build sample metadata (population labels + prefixed iids)
pop_names = np.array(
[ts.population(ind.population).metadata["name"] for ind in ts.individuals()],
dtype=object,
)
iids = np.empty(ts.num_individuals, dtype=object)
pop_counter: Dict[str, int] = {}
for i, pop in enumerate(pop_names):
j = pop_counter.get(pop, 0)
iids[i] = f"{pop}_{j}"
pop_counter[pop] = j + 1
iids = iids.astype(str)
samples_meta = xft.struct.SampleMeta(
iid=iids,
generation=generation,
extra={"population": pop_names},
)
# Step 5: Build variant metadata (real positions, alleles, cumulative cM)
num_grg_muts = grg.num_mutations
pos_bp = np.empty(num_grg_muts, dtype=np.int64)
ref_alleles = np.empty(num_grg_muts, dtype=object)
alt_alleles = np.empty(num_grg_muts, dtype=object)
for i in range(num_grg_muts):
mut = grg.get_mutation_by_id(i)
pos_bp[i] = int(mut.position)
ref_alleles[i] = str(mut.ref_allele)
alt_alleles[i] = str(mut.allele)
chroms = np.repeat(str(chromosome), num_grg_muts)
vids = np.array(
[f"{chromosome}:{p}:{r}:{a}"
for p, r, a in zip(pos_bp, ref_alleles, alt_alleles)],
dtype=str,
)
pos_cM = np.asarray(
contig.recombination_map.get_cumulative_mass(pos_bp),
dtype=np.float64,
) * 100.0
variants = xft.struct.VariantMeta(
vid=vids,
chrom=chroms,
pos_bp=pos_bp,
pos_cM=pos_cM,
zero_allele=ref_alleles,
one_allele=alt_alleles,
)
# Step 6: Instantiate and Return
return xft.struct.GraphHaplotypeOperator(
grg=grg, generation=generation, samples=samples_meta, variants=variants,
)