Source code for lineage.generator

"""Generate synthetic genotype data for related individuals.

This module extends the SyntheticSNPGenerator from snps to create pairs of
genetically related individuals (parent-child and sibling pairs) suitable
for testing and demonstrating lineage's shared DNA analysis capabilities.
"""

from __future__ import annotations

import logging
import os

import numpy as np
import pandas as pd
from snps.io.generator import SyntheticSNPGenerator

logger = logging.getLogger(__name__)

# Valid nucleotide bases for genotype generation
BASES = np.array(["A", "C", "G", "T"])


[docs] class SyntheticRelatedGenerator(SyntheticSNPGenerator): """Generate synthetic genotype data for related individuals. This class extends SyntheticSNPGenerator to create pairs of genetically related individuals with realistic inheritance patterns. Genetic Relationships: - Parent-Child: Share exactly ONE allele at every position (IBD1 = 100%) - Siblings: Share DNA in segments due to recombination - ~25% IBD2 (both chromosomes identical) - ~50% IBD1 (one chromosome identical) - ~25% IBD0 (no sharing) Parameters ---------- build : int Genome build (36, 37, or 38), default is 37 seed : int, optional Random seed for reproducibility crossovers_per_chrom : float Average number of crossovers per chromosome per meiosis (default: 1.0) Examples -------- >>> gen = SyntheticRelatedGenerator(build=37, seed=42) >>> parent_df, child_df = gen.generate_parent_child_pair(num_snps=10000) >>> sib1_df, sib2_df = gen.generate_sibling_pair(num_snps=10000) """
[docs] def __init__( self, build: int = 37, seed: int | None = None, crossovers_per_chrom: float = 1.0, ) -> None: super().__init__(build=build, seed=seed) self.crossovers_per_chrom = crossovers_per_chrom
[docs] def generate_parent_child_pair( self, num_snps: int = 100000, chromosomes: list[str] | None = None, missing_rate: float = 0.01, discordant_rate: float = 0.0, ) -> tuple[pd.DataFrame, pd.DataFrame]: """Generate a parent-child pair with realistic inheritance. The child inherits exactly one allele from the parent at every position, resulting in 100% IBD1 (one chromosome shared) when analyzed. Parameters ---------- num_snps : int Approximate number of SNPs to generate (default: 100000) chromosomes : list of str, optional Chromosomes to include (default: all autosomes plus X, Y, MT) missing_rate : float Proportion of SNPs with missing genotypes (default: 0.01) discordant_rate : float Proportion of SNPs that are discordant (child doesn't inherit from parent), simulating genotyping errors (default: 0.0) Returns ------- tuple of (pd.DataFrame, pd.DataFrame) (parent_df, child_df) - DataFrames with rsid index, chrom, pos, genotype """ # Generate parent genotypes parent_df = self.generate_snps( num_snps=num_snps, chromosomes=chromosomes, missing_rate=missing_rate, ) # Generate child by inheriting one allele from parent child_df = self._generate_child_from_parent( parent_df, missing_rate, discordant_rate ) return parent_df, child_df
def _generate_child_from_parent( self, parent_df: pd.DataFrame, missing_rate: float, discordant_rate: float = 0.0, ) -> pd.DataFrame: """Generate child genotypes from parent. For each SNP: - Child inherits one allele from parent - Child gets one random allele from "other parent" (simulated) - With discordant_rate probability, child gets a genotype that doesn't match parent (simulating genotyping errors) """ child_df = parent_df.copy() n = len(parent_df) parent_genotypes = parent_df["genotype"].values child_genotypes = np.empty(n, dtype=object) # Generate random alleles for "other parent" other_parent_alleles = BASES[self.rng.integers(0, 4, size=n)] # Determine which SNPs will be discordant discordant_mask = self.rng.random(n) < discordant_rate for i, parent_gt in enumerate(parent_genotypes): if parent_gt == "--": # If parent is missing, child is also missing child_genotypes[i] = "--" elif discordant_mask[i]: # Generate a discordant genotype (no allele matches parent) # Get alleles that are NOT in the parent genotype parent_alleles = set(parent_gt) non_parent_alleles = [b for b in BASES if b not in parent_alleles] if len(non_parent_alleles) >= 2: # Pick two random non-parent alleles chosen = self.rng.choice(non_parent_alleles, size=2, replace=True) child_genotypes[i] = chosen[0] + chosen[1] elif len(non_parent_alleles) == 1: # Homozygous for the non-parent allele child_genotypes[i] = non_parent_alleles[0] + non_parent_alleles[0] else: # Parent is heterozygous with all 4 bases (shouldn't happen) # Fall back to normal inheritance inherited_allele = parent_gt[self.rng.integers(0, 2)] child_genotypes[i] = inherited_allele + other_parent_alleles[i] else: # Child inherits one allele from parent (randomly choose which) inherited_allele = parent_gt[self.rng.integers(0, 2)] other_allele = other_parent_alleles[i] # Combine alleles without sorting to mimic unphased genotype files child_genotypes[i] = inherited_allele + other_allele # Apply missing rate to child (additional missing data) missing_mask = self.rng.random(n) < missing_rate child_genotypes[missing_mask] = "--" child_df["genotype"] = child_genotypes return child_df
[docs] def generate_sibling_pair( self, num_snps: int = 100000, chromosomes: list[str] | None = None, missing_rate: float = 0.01, ) -> tuple[pd.DataFrame, pd.DataFrame]: """Generate a sibling pair with realistic recombination patterns. Siblings share DNA in segments due to recombination during meiosis: - ~25% IBD2 (both chromosomes identical from both parents) - ~50% IBD1 (one chromosome identical) - ~25% IBD0 (no sharing) Parameters ---------- num_snps : int Approximate number of SNPs to generate (default: 100000) chromosomes : list of str, optional Chromosomes to include (default: all autosomes plus X, Y, MT) missing_rate : float Proportion of SNPs with missing genotypes (default: 0.01) Returns ------- tuple of (pd.DataFrame, pd.DataFrame) (sibling1_df, sibling2_df) - DataFrames with rsid index, chrom, pos, genotype """ from snps.constants import REFERENCE_SEQUENCE_CHROMS if chromosomes is None: chromosomes = list(REFERENCE_SEQUENCE_CHROMS) # Generate four parental haplotypes (two per parent) # Parent 1: haplotypes A and B # Parent 2: haplotypes C and D base_snps = self.generate_snps( num_snps=num_snps, chromosomes=chromosomes, missing_rate=0, # Handle missing rate separately inject_build_markers=True, ) # Generate haplotypes for all four parental chromosomes hap_a = self._generate_haplotype(len(base_snps)) hap_b = self._generate_haplotype(len(base_snps)) hap_c = self._generate_haplotype(len(base_snps)) hap_d = self._generate_haplotype(len(base_snps)) # For each sibling, simulate meiosis from each parent sib1_genotypes = self._simulate_sibling_genotypes( base_snps, hap_a, hap_b, hap_c, hap_d, chromosomes ) sib2_genotypes = self._simulate_sibling_genotypes( base_snps, hap_a, hap_b, hap_c, hap_d, chromosomes ) # Create sibling DataFrames sib1_df = base_snps.copy() sib2_df = base_snps.copy() sib1_df["genotype"] = sib1_genotypes sib2_df["genotype"] = sib2_genotypes # Apply missing rate n = len(base_snps) missing_mask_1 = self.rng.random(n) < missing_rate missing_mask_2 = self.rng.random(n) < missing_rate sib1_df.loc[sib1_df.index[missing_mask_1], "genotype"] = "--" sib2_df.loc[sib2_df.index[missing_mask_2], "genotype"] = "--" return sib1_df, sib2_df
def _generate_haplotype(self, n: int) -> np.ndarray: """Generate a random haplotype (single alleles, not genotypes).""" return BASES[self.rng.integers(0, 4, size=n)] def _simulate_sibling_genotypes( self, base_snps: pd.DataFrame, hap_a: np.ndarray, hap_b: np.ndarray, hap_c: np.ndarray, hap_d: np.ndarray, chromosomes: list[str], ) -> np.ndarray: """Simulate genotypes for one sibling via meiosis from both parents. For each chromosome: 1. Simulate recombination in parent 1 (between hap_a and hap_b) 2. Simulate recombination in parent 2 (between hap_c and hap_d) 3. Combine the resulting gametes to form the sibling's genotype """ n = len(base_snps) genotypes = np.empty(n, dtype=object) # Get chromosome boundaries chrom_col = base_snps["chrom"].values for chrom in chromosomes: chrom_mask = chrom_col == chrom chrom_indices = np.where(chrom_mask)[0] if len(chrom_indices) == 0: continue # Simulate meiosis for this chromosome from each parent gamete_from_p1 = self._simulate_meiosis( hap_a[chrom_indices], hap_b[chrom_indices] ) gamete_from_p2 = self._simulate_meiosis( hap_c[chrom_indices], hap_d[chrom_indices] ) # Combine gametes to form genotypes (without sorting to mimic unphased data) for i, idx in enumerate(chrom_indices): genotypes[idx] = gamete_from_p1[i] + gamete_from_p2[i] return genotypes def _simulate_meiosis(self, hap1: np.ndarray, hap2: np.ndarray) -> np.ndarray: """Simulate meiosis with recombination between two haplotypes. Returns a gamete (single haplotype) that is a recombinant of the two inputs. """ n = len(hap1) if n == 0: return np.array([], dtype=object) # Generate crossover points using Poisson distribution num_crossovers = self.rng.poisson(self.crossovers_per_chrom) if num_crossovers > 0 and n > 1: crossover_positions = np.sort(self.rng.integers(1, n, size=num_crossovers)) else: crossover_positions = np.array([], dtype=int) # Build the gamete by alternating between haplotypes at crossover points gamete = np.empty(n, dtype=object) current_hap = hap1 if self.rng.random() < 0.5 else hap2 other_hap = hap2 if current_hap is hap1 else hap1 prev_pos = 0 for pos in crossover_positions: gamete[prev_pos:pos] = current_hap[prev_pos:pos] # Swap haplotypes current_hap, other_hap = other_hap, current_hap prev_pos = pos # Fill in the rest gamete[prev_pos:] = current_hap[prev_pos:] return gamete
[docs] def save_parent_child_pair( self, output_dir: str, parent_format: str = "23andme", child_format: str = "ftdna", num_snps: int = 100000, discordant_rate: float = 0.0, **kwargs, ) -> tuple[str, str]: """Generate and save a parent-child pair to files. Parameters ---------- output_dir : str Directory for output files parent_format : str Format for parent file ('23andme', 'ftdna', 'ancestry') child_format : str Format for child file ('23andme', 'ftdna', 'ancestry') num_snps : int Approximate number of SNPs to generate discordant_rate : float Proportion of SNPs that are discordant (default: 0.0) **kwargs Additional arguments passed to generate_parent_child_pair Returns ------- tuple of (str, str) Paths to (parent_file, child_file) """ os.makedirs(output_dir, exist_ok=True) parent_df, child_df = self.generate_parent_child_pair( num_snps=num_snps, discordant_rate=discordant_rate, **kwargs ) parent_path = self._save_dataframe( parent_df, output_dir, "parent", parent_format ) child_path = self._save_dataframe(child_df, output_dir, "child", child_format) return parent_path, child_path
[docs] def save_sibling_pair( self, output_dir: str, sib1_format: str = "23andme", sib2_format: str = "ftdna", num_snps: int = 100000, **kwargs, ) -> tuple[str, str]: """Generate and save a sibling pair to files. Parameters ---------- output_dir : str Directory for output files sib1_format : str Format for sibling 1 file ('23andme', 'ftdna', 'ancestry') sib2_format : str Format for sibling 2 file ('23andme', 'ftdna', 'ancestry') num_snps : int Approximate number of SNPs to generate **kwargs Additional arguments passed to generate_sibling_pair Returns ------- tuple of (str, str) Paths to (sibling1_file, sibling2_file) """ os.makedirs(output_dir, exist_ok=True) sib1_df, sib2_df = self.generate_sibling_pair(num_snps=num_snps, **kwargs) sib1_path = self._save_dataframe(sib1_df, output_dir, "sibling1", sib1_format) sib2_path = self._save_dataframe(sib2_df, output_dir, "sibling2", sib2_format) return sib1_path, sib2_path
def _save_dataframe( self, df: pd.DataFrame, output_dir: str, name: str, format: str ) -> str: """Save a DataFrame in the specified format.""" format_info = { "23andme": ("txt.gz", self._write_snps_as_23andme), "ftdna": ("csv.gz", self._write_snps_as_ftdna), "ancestry": ("txt.gz", self._write_snps_as_ancestry), } if format not in format_info: raise ValueError( f"Unknown format: {format}. Use: {list(format_info.keys())}" ) ext, writer = format_info[format] path = os.path.join(output_dir, f"{name}.{format}.{ext}") logger.info(f"Creating {os.path.relpath(path)}") writer(df, path) return path