diff --git a/config_templates/config.ini b/config_templates/config.ini index e8573a1..1c95c7e 100644 --- a/config_templates/config.ini +++ b/config_templates/config.ini @@ -13,6 +13,12 @@ [cgenff] SILCSBIODIR = +[gromacs] +; GMX_PATH defaults to looking up "gmx" on PATH +GMX_PATH = +; FORCEFIELD_DIR defaults to /forcefields +FORCEFIELD_DIR = + [storage] ; PARAMETERS defaults to /parameters PARAMETERS = diff --git a/examples/proteinbox/lysozyme_charmm36m.yaml b/examples/proteinbox/lysozyme_charmm36m.yaml new file mode 100644 index 0000000..a67e0e5 --- /dev/null +++ b/examples/proteinbox/lysozyme_charmm36m.yaml @@ -0,0 +1,25 @@ +simulation_type: proteinbox +engine: gromacs +parametrization: pdb2gmx +parametrization_config: + type: pdb2gmx + forcefield: charmm36m + water_model: tip3p + ignh: true + +system: + protein: + resname: LYZ + count: 1 + pdb_path: ./1aki.pdb + disulfide_bonds: + - [6, 127] + - [30, 115] + - [64, 80] + - [76, 94] + protonation_states: + HIS15: HIE + box_padding: 12.0 + ionization: + neutralize: true + concentration: 0.15 diff --git a/mdfactory/build.py b/mdfactory/build.py index 6f728b3..3b98fbe 100644 --- a/mdfactory/build.py +++ b/mdfactory/build.py @@ -1,9 +1,10 @@ # ABOUTME: Core build pipeline for constructing MD simulation systems -# ABOUTME: Dispatches to mixedbox, bilayer, and LNP build routines +# ABOUTME: Dispatches to mixedbox, bilayer, LNP, and proteinbox build routines """Core build pipeline for constructing MD simulation systems.""" import os from functools import partial +from pathlib import Path import MDAnalysis as mda import numpy as np @@ -682,3 +683,136 @@ def ionize_solvated_system(ion_config, u_solvated, total_charge): na_spec = SingleMoleculeSpecies(count=num_na, smiles="[Na+]", resname="NA") cl_spec = SingleMoleculeSpecies(count=num_cl, smiles="[Cl-]", resname="CL") return u_ionized, [na_spec, cl_spec] + + +@validate_call +def build_proteinbox(inp: BuildInput): + """Build a protein-in-waterbox system: clean, parametrize, solvate, ionize, relax.""" + import shutil + + from loguru import logger + + from .models.composition import ProteinBoxComposition + from .models.parametrization import Pdb2gmxConfig + from .setup.protein import ( + check_gmx_available, + clean_pdb, + run_pdb2gmx, + update_topology_molecules, + validate_with_grompp, + ) + + if not isinstance(inp.system, ProteinBoxComposition): + raise TypeError("System must specify a ProteinBoxComposition.") + + if not isinstance(inp.parametrization_config, Pdb2gmxConfig): + raise TypeError("Parametrization config must be Pdb2gmxConfig for proteinbox.") + + # 1. Verify GROMACS is available + gmx_path = check_gmx_available() + logger.info(f"Using GROMACS: {gmx_path}") + + protein = inp.system.protein + config = inp.parametrization_config + + # 2. Clean PDB + pdb_input = protein.pdb_path + if not pdb_input.is_absolute(): + pdb_input = pdb_input.resolve() + + cleaned_pdb = Path("cleaned.pdb") + clean_pdb(pdb_input, cleaned_pdb) + + # 3. Run pdb2gmx + pdb2gmx_dir = Path("pdb2gmx_output") + params = run_pdb2gmx( + pdb_path=cleaned_pdb, + config=config, + disulfide_bonds=protein.disulfide_bonds, + protonation_states=protein.protonation_states, + output_dir=pdb2gmx_dir, + ) + logger.info(f"pdb2gmx output: {params.structure_file}") + + # 4. Center protein in cubic box + u = mda.Universe(str(params.structure_file)) + extent = u.atoms.positions.max(axis=0) - u.atoms.positions.min(axis=0) + box_size = extent.max() + 2 * inp.system.box_padding + u.dimensions = [box_size, box_size, box_size, 90, 90, 90] + center = np.array([box_size / 2, box_size / 2, box_size / 2]) + u.atoms.translate(-u.atoms.center_of_geometry() + center) + logger.info(f"Protein centered in cubic box: {box_size:.1f} A") + + # 5. Solvate (reuse existing solvation) + u_solvated = solvate(u, prune_in_z=False) + u_solvated.dimensions = u.dimensions + n_water = len(u_solvated.select_atoms("water").residues) + logger.info(f"Solvation added {n_water} water molecules.") + + if n_water == 0: + raise ValueError("Solvation did not add any water molecules.") + + # 6. Ionize (reuse existing ionization) + protein_charge = params.total_charge + u_ionized, ion_species = ionize_solvated_system( + inp.system.ionization, u_solvated, protein_charge + ) + num_na = ion_species[0].count if len(ion_species) > 0 else 0 + num_cl = ion_species[1].count if len(ion_species) > 1 else 0 + n_water_final = len(u_ionized.select_atoms("water").residues) + + # 7. Update topology [ molecules ] section + topology_dest = Path("topology.top") + shutil.copy(params.topology_file, topology_dest) + shutil.copy(params.position_restraint_file, Path("posre.itp")) + update_topology_molecules(topology_dest, n_water_final, num_na, num_cl) + + # 8. Write coordinates for OpenMM relaxation + pre_relax_structure = Path("system_pre_relax.pdb").resolve() + u_ionized.atoms.write(str(pre_relax_structure)) + + # 9. OpenMM relaxation with protein position restraints + protein_atoms = u_ionized.select_atoms("protein and not name H*") + protein_indices = protein_atoms.indices.tolist() + topology_dest = topology_dest.resolve() + position_restraint_file = Path("posre.itp").resolve() + + with working_directory("relaxation", create=True, cleanup=True) as wd: + shutil.copy(pre_relax_structure, wd / "system.pdb") + shutil.copy(topology_dest, wd / "topology.top") + shutil.copy(position_restraint_file, wd / "posre.itp") + + # Copy forcefield directory if referenced by topology + # pdb2gmx topologies use #include from the GROMACS data dir, so OpenMM + # needs the GromacsTopFile to resolve them via GMXLIB or local copies + from .simulation.openmm_utils import relax_with_protein_restraints + + u_relaxed = relax_with_protein_restraints( + u_ionized, + wd / "system.pdb", + wd / "topology.top", + protein_indices=protein_indices, + steps=10000, + ) + + logger.info("OpenMM relaxation complete.") + + # 10. Write final output + u_relaxed.atoms.write("system.pdb") + logger.info("Final system written to system.pdb") + + # 11. Validate with grompp (if em.mdp is available) + manager = RunScheduleManager() + manager.copy_run_files_with_check( + engine=inp.engine, system_type="proteinbox", target_folder=os.getcwd(), force_copy=True + ) + logger.info("Run schedule files copied.") + + em_mdp = Path("em.mdp") + if em_mdp.is_file(): + try: + validate_with_grompp(topology_dest, Path("system.pdb"), em_mdp, Path.cwd()) + except RuntimeError as e: + logger.warning(f"grompp validation failed (non-fatal): {e}") + + logger.info("Proteinbox build complete.") diff --git a/mdfactory/models/composition.py b/mdfactory/models/composition.py index 8488d1d..b88bfd3 100644 --- a/mdfactory/models/composition.py +++ b/mdfactory/models/composition.py @@ -1,13 +1,13 @@ -# ABOUTME: Pydantic models for system composition (mixedbox, bilayer, LNP) +# ABOUTME: Pydantic models for system composition (mixedbox, bilayer, LNP, proteinbox) # ABOUTME: Defines species counts, ionization, and composition validation -"""Pydantic models for system composition (mixedbox, bilayer, LNP).""" +"""Pydantic models for system composition (mixedbox, bilayer, LNP, proteinbox).""" from typing import Optional import numpy as np from pydantic import BaseModel, ConfigDict, Field, model_validator -from .species import LipidSpecies, SingleMoleculeSpecies, Species +from .species import LipidSpecies, ProteinSpecies, SingleMoleculeSpecies, Species def distribute_counts(fractions: list[float], total: int) -> list[int]: @@ -375,3 +375,32 @@ def get_species_with_counts(self) -> list[Species]: else: species_map[key] = spec.model_copy() return list(species_map.values()) + + +class ProteinBoxComposition(BaseModel): + """Define a protein-in-waterbox system: one protein centered in a cubic box with ions.""" + + model_config = ConfigDict(extra="forbid") + + protein: ProteinSpecies + box_padding: float = Field( + 10.0, description="Minimum distance from protein to box edge in Angstroms.", ge=0.0 + ) + ionization: IonizationConfig = Field( + default_factory=IonizationConfig, description="Configuration for ionization." + ) + + @property + def species(self) -> list[ProteinSpecies]: + """Return protein as a single-element species list for metadata compatibility.""" + return [self.protein] + + @property + def total_count(self) -> int: + """Return 1 (one protein) for metadata compatibility.""" + return 1 + + @property + def charge(self) -> int: + """Protein charge is not pre-computable; return 0 as placeholder.""" + return 0 diff --git a/mdfactory/models/input.py b/mdfactory/models/input.py index e24497a..b748b2b 100644 --- a/mdfactory/models/input.py +++ b/mdfactory/models/input.py @@ -8,8 +8,18 @@ from pydantic import BaseModel, Field, model_validator -from .composition import BilayerComposition, LNPComposition, MixedBoxComposition -from .parametrization import CgenffConfig, ParametrizationConfig, SmirnoffConfig +from .composition import ( + BilayerComposition, + LNPComposition, + MixedBoxComposition, + ProteinBoxComposition, +) +from .parametrization import ( + CgenffConfig, + ParametrizationConfig, + Pdb2gmxConfig, + SmirnoffConfig, +) # Map system_type to the corresponding model class # TODO: StrEnum? @@ -17,15 +27,16 @@ "mixedbox": MixedBoxComposition, "bilayer": BilayerComposition, "lnp": LNPComposition, + "proteinbox": ProteinBoxComposition, } class BuildInput(BaseModel): """Represent a complete simulation build specification with composition and parametrization.""" - simulation_type: Literal["mixedbox", "bilayer", "lnp"] - system: MixedBoxComposition | BilayerComposition | LNPComposition - parametrization: Literal["cgenff", "smirnoff"] = Field( + simulation_type: Literal["mixedbox", "bilayer", "lnp", "proteinbox"] + system: MixedBoxComposition | BilayerComposition | LNPComposition | ProteinBoxComposition + parametrization: Literal["cgenff", "smirnoff", "pdb2gmx"] = Field( "cgenff", description="Parametrization to use." ) parametrization_config: ParametrizationConfig | None = Field( @@ -74,6 +85,10 @@ def metadata(self) -> dict[str, Any]: elif self.simulation_type == "mixedbox": system_specific["target_density"] = self.system.target_density system_specific["ionization"] = self.system.ionization.model_dump() + elif self.simulation_type == "proteinbox": + system_specific["box_padding"] = self.system.box_padding + system_specific["ionization"] = self.system.ionization.model_dump() + system_specific["pdb_path"] = str(self.system.protein.pdb_path) return { "hash": self.hash, @@ -162,4 +177,6 @@ def set_default_parametrization_config(self) -> "BuildInput": object.__setattr__(self, "parametrization_config", SmirnoffConfig()) elif self.parametrization == "cgenff": object.__setattr__(self, "parametrization_config", CgenffConfig()) + elif self.parametrization == "pdb2gmx": + object.__setattr__(self, "parametrization_config", Pdb2gmxConfig()) return self diff --git a/mdfactory/models/parametrization.py b/mdfactory/models/parametrization.py index 13b9298..e14733e 100644 --- a/mdfactory/models/parametrization.py +++ b/mdfactory/models/parametrization.py @@ -50,8 +50,24 @@ class CgenffConfig(BaseModel): # CGenFF uses SILCSBIODIR from config.ini, no additional settings needed +class Pdb2gmxConfig(BaseModel): + """Configuration for pdb2gmx protein parametrization.""" + + model_config = ConfigDict(extra="forbid", frozen=True) + + type: Literal["pdb2gmx"] = Field("pdb2gmx", description="Config type discriminator.") + forcefield: str = Field( + "charmm36m", description="Force field name as recognized by gmx pdb2gmx." + ) + water_model: str = Field("tip3p", description="Water model name as recognized by gmx pdb2gmx.") + ignh: bool = Field(True, description="Ignore hydrogens in input PDB (regenerate with pdb2gmx).") + merge_all: bool = Field(False, description="Merge all chains into a single moleculetype.") + + ParametrizationConfig = Annotated[ - Annotated[SmirnoffConfig, Tag("smirnoff")] | Annotated[CgenffConfig, Tag("cgenff")], + Annotated[SmirnoffConfig, Tag("smirnoff")] + | Annotated[CgenffConfig, Tag("cgenff")] + | Annotated[Pdb2gmxConfig, Tag("pdb2gmx")], Discriminator("type"), ] @@ -91,3 +107,16 @@ def from_data_row(cls, data_row): raise ValueError("Invalid parameter_data_type for GromacsSingleMoleculeParameterSet.") data = json.loads(data_row["parameter_data"]) return cls(**data) + + +class GromacsProteinParameterSet(BaseModel): + """Store GROMACS topology output from pdb2gmx for a protein system.""" + + model_config = ConfigDict(extra="forbid") + + topology_file: Path + structure_file: Path + position_restraint_file: Path + forcefield: str + water_model: str + total_charge: int diff --git a/mdfactory/models/species.py b/mdfactory/models/species.py index 1912ce5..a41d958 100644 --- a/mdfactory/models/species.py +++ b/mdfactory/models/species.py @@ -1,11 +1,12 @@ -# ABOUTME: Pydantic models for molecular species (small molecules and lipids) +# ABOUTME: Pydantic models for molecular species (small molecules, lipids, proteins) # ABOUTME: Provides SMILES-based identity, charge, and molecular object properties -"""Pydantic models for molecular species (small molecules and lipids).""" +"""Pydantic models for molecular species (small molecules, lipids, proteins).""" import hashlib import io import warnings from functools import cached_property +from pathlib import Path from typing import Optional from pydantic import BaseModel, Field, model_validator @@ -196,3 +197,25 @@ def rdkit_molecule(self) -> "Chem.rdchem.Mol": # u, tail_atom_ids=self.tail_atoms, head_atom_ids=self.head_atoms, z_axis=[1, 0, 0] # ) # return u + + +class ProteinSpecies(Species): + """Represent a protein species identified by a PDB structure file.""" + + count: Optional[int] = Field(1, description="Number of protein copies (always 1).") + fraction: Optional[float] = Field(1.0, description="Fraction (always 1.0 for protein).") + pdb_path: Path = Field(..., description="Path to the input PDB file.") + disulfide_bonds: list[tuple[int, int]] = Field( + default_factory=list, + description="Pairs of residue IDs forming disulfide bonds, e.g. [(6, 127), (30, 115)].", + ) + protonation_states: dict[str, str] = Field( + default_factory=dict, + description="Residue-specific protonation states, e.g. {'HIS15': 'HIE', 'GLU35': 'GLH'}.", + ) + + @property + def charge(self) -> int: + raise NotImplementedError( + "Protein charge is determined by pdb2gmx topology output, not pre-computable." + ) diff --git a/mdfactory/run_schedules/gromacs/proteinbox/em.mdp b/mdfactory/run_schedules/gromacs/proteinbox/em.mdp new file mode 100644 index 0000000..6099eaf --- /dev/null +++ b/mdfactory/run_schedules/gromacs/proteinbox/em.mdp @@ -0,0 +1,20 @@ +; Energy minimization parameters for protein systems (CHARMM36m) +integrator = steep +emtol = 1000.0 +emstep = 0.01 +nsteps = 50000 + +; Neighbor searching +nstlist = 1 +cutoff-scheme = Verlet +ns_type = grid +pbc = xyz + +; Electrostatics and VdW (CHARMM36m settings) +coulombtype = PME +rcoulomb = 1.2 +rvdw = 1.2 +vdwtype = cutoff +vdw-modifier = Force-switch +rvdw-switch = 1.0 +DispCorr = no diff --git a/mdfactory/run_schedules/gromacs/proteinbox/md.mdp b/mdfactory/run_schedules/gromacs/proteinbox/md.mdp new file mode 100644 index 0000000..e2af45d --- /dev/null +++ b/mdfactory/run_schedules/gromacs/proteinbox/md.mdp @@ -0,0 +1,57 @@ +; MD production run for protein systems (CHARMM36m) +title = MD production run + +; Run parameters +integrator = md +nsteps = 50000000 +dt = 0.002 + +; Output control +nstxout = 0 +nstvout = 0 +nstfout = 0 +nstenergy = 5000 +nstlog = 5000 +nstxout-compressed = 5000 +compressed-x-grps = System + +; Bond parameters +continuation = yes +constraint_algorithm = lincs +constraints = h-bonds +lincs_iter = 1 +lincs_order = 4 + +; Neighbor searching +cutoff-scheme = Verlet +ns_type = grid +nstlist = 10 +pbc = xyz + +; Electrostatics and VdW (CHARMM36m settings) +coulombtype = PME +pme_order = 4 +fourierspacing = 0.16 +rcoulomb = 1.2 +rvdw = 1.2 +vdwtype = cutoff +vdw-modifier = Force-switch +rvdw-switch = 1.0 +DispCorr = no + +; Temperature coupling +tcoupl = V-rescale +tc-grps = Protein Non-Protein +tau_t = 0.1 0.1 +ref_t = 300 300 + +; Pressure coupling (Parrinello-Rahman for production) +pcoupl = Parrinello-Rahman +pcoupltype = isotropic +tau_p = 5.0 +ref_p = 1.0 +compressibility = 4.5e-5 +refcoord_scaling = com + +; Velocity generation +gen_vel = no diff --git a/mdfactory/run_schedules/gromacs/proteinbox/npt.mdp b/mdfactory/run_schedules/gromacs/proteinbox/npt.mdp new file mode 100644 index 0000000..95eceb8 --- /dev/null +++ b/mdfactory/run_schedules/gromacs/proteinbox/npt.mdp @@ -0,0 +1,54 @@ +; NPT equilibration for protein systems (CHARMM36m) +title = NPT equilibration +define = -DPOSRES + +; Run parameters +integrator = md +nsteps = 50000 +dt = 0.002 + +; Output control +nstxout = 500 +nstvout = 500 +nstenergy = 500 +nstlog = 500 + +; Bond parameters +continuation = yes +constraint_algorithm = lincs +constraints = h-bonds +lincs_iter = 1 +lincs_order = 4 + +; Neighbor searching +cutoff-scheme = Verlet +ns_type = grid +nstlist = 10 +pbc = xyz + +; Electrostatics and VdW (CHARMM36m settings) +coulombtype = PME +pme_order = 4 +fourierspacing = 0.16 +rcoulomb = 1.2 +rvdw = 1.2 +vdwtype = cutoff +vdw-modifier = Force-switch +rvdw-switch = 1.0 +DispCorr = no + +; Temperature coupling +tcoupl = V-rescale +tc-grps = Protein Non-Protein +tau_t = 0.1 0.1 +ref_t = 300 300 + +; Pressure coupling +pcoupl = Berendsen +pcoupltype = isotropic +tau_p = 2.0 +ref_p = 1.0 +compressibility = 4.5e-5 + +; Velocity generation +gen_vel = no diff --git a/mdfactory/run_schedules/gromacs/proteinbox/nvt.mdp b/mdfactory/run_schedules/gromacs/proteinbox/nvt.mdp new file mode 100644 index 0000000..a85ecce --- /dev/null +++ b/mdfactory/run_schedules/gromacs/proteinbox/nvt.mdp @@ -0,0 +1,52 @@ +; NVT equilibration for protein systems (CHARMM36m) +title = NVT equilibration +define = -DPOSRES + +; Run parameters +integrator = md +nsteps = 50000 +dt = 0.002 + +; Output control +nstxout = 500 +nstvout = 500 +nstenergy = 500 +nstlog = 500 + +; Bond parameters +continuation = no +constraint_algorithm = lincs +constraints = h-bonds +lincs_iter = 1 +lincs_order = 4 + +; Neighbor searching +cutoff-scheme = Verlet +ns_type = grid +nstlist = 10 +pbc = xyz + +; Electrostatics and VdW (CHARMM36m settings) +coulombtype = PME +pme_order = 4 +fourierspacing = 0.16 +rcoulomb = 1.2 +rvdw = 1.2 +vdwtype = cutoff +vdw-modifier = Force-switch +rvdw-switch = 1.0 +DispCorr = no + +; Temperature coupling +tcoupl = V-rescale +tc-grps = Protein Non-Protein +tau_t = 0.1 0.1 +ref_t = 300 300 + +; Pressure coupling off for NVT +pcoupl = no + +; Velocity generation +gen_vel = yes +gen_temp = 300 +gen_seed = -1 diff --git a/mdfactory/run_schedules/run_schedules.yaml b/mdfactory/run_schedules/run_schedules.yaml index 684567e..3a591d4 100644 --- a/mdfactory/run_schedules/run_schedules.yaml +++ b/mdfactory/run_schedules/run_schedules.yaml @@ -36,3 +36,16 @@ num_gpus: 1 num_cores: 8 walltime: 24:00:00 + +- engine: gromacs + system_type: proteinbox + version: 1.0 + run_files: + - em.mdp + - nvt.mdp + - npt.mdp + - md.mdp + settings: + num_gpus: 1 + num_cores: 8 + walltime: 24:00:00 diff --git a/mdfactory/settings.py b/mdfactory/settings.py index 7c99a42..46e7a9c 100644 --- a/mdfactory/settings.py +++ b/mdfactory/settings.py @@ -45,6 +45,10 @@ def _get_defaults() -> dict[str, dict[str, str]]: "cgenff": { "SILCSBIODIR": "", }, + "gromacs": { + "GMX_PATH": "", + "FORCEFIELD_DIR": str(data_dir / "forcefields"), + }, "storage": { "PARAMETERS": str(data_dir / "parameters"), }, @@ -75,6 +79,7 @@ def _get_defaults() -> dict[str, dict[str, str]]: # Keep DEFAULT_CONFIG as class attribute for backward compat in tests DEFAULT_CONFIG = { "cgenff": {"SILCSBIODIR": ""}, + "gromacs": {"GMX_PATH": "", "FORCEFIELD_DIR": ""}, "storage": {"PARAMETERS": ""}, "database": {"TYPE": "sqlite"}, "databases": { @@ -105,6 +110,10 @@ def __init__(self): self._load_config() if "cgenff" in self.config: os.environ.setdefault("SILCSBIODIR", str(self.cgenff_dir)) + if "gromacs" in self.config: + env = self.gromacs_env(os.environ) + if "GMXLIB" in env: + os.environ["GMXLIB"] = env["GMXLIB"] def _load_config(self): """Load configuration from defaults, then user config, then env overrides.""" @@ -146,6 +155,7 @@ def ensure_dirs(self): get_config_dir().mkdir(parents=True, exist_ok=True) get_data_dir().mkdir(parents=True, exist_ok=True) self.parameter_store.mkdir(parents=True, exist_ok=True) + self.gromacs_forcefield_dir.mkdir(parents=True, exist_ok=True) # --- CGenFF properties --- @@ -159,6 +169,37 @@ def cgenff_dir(self) -> Path: """Return the CGenFF directory (SILCSBIODIR).""" return Path(self.config["cgenff"].get("SILCSBIODIR", "")).resolve() + # --- GROMACS properties --- + + @property + def gromacs_gmx_path(self) -> Path | None: + """Return the configured gmx binary path, or None to use PATH lookup.""" + raw = self.config["gromacs"].get("GMX_PATH", "") + if not raw: + return None + return Path(self._resolve_local_path(raw)) + + @property + def gromacs_forcefield_dir(self) -> Path: + """Return the directory where downloaded force fields are stored.""" + raw = self.config["gromacs"].get("FORCEFIELD_DIR", "") + if raw: + return Path(self._resolve_local_path(raw)) + return get_data_dir() / "forcefields" + + def gromacs_env(self, base_env: dict[str, str] | None = None) -> dict[str, str]: + """Return an environment with configured GROMACS force fields on GMXLIB.""" + env = dict(os.environ if base_env is None else base_env) + forcefield_dir = str(self.gromacs_forcefield_dir) + if not forcefield_dir: + return env + + paths = [p for p in env.get("GMXLIB", "").split(":") if p] + if forcefield_dir not in paths: + paths.insert(0, forcefield_dir) + env["GMXLIB"] = ":".join(paths) + return env + # --- Storage properties --- @property diff --git a/mdfactory/setup/protein.py b/mdfactory/setup/protein.py new file mode 100644 index 0000000..ddb3d4e --- /dev/null +++ b/mdfactory/setup/protein.py @@ -0,0 +1,714 @@ +# ABOUTME: Protein preparation utilities for the proteinbox simulation type +# ABOUTME: Wraps PDBFixer for cleaning and gmx pdb2gmx for topology generation +"""Protein preparation utilities for the proteinbox simulation type.""" + +import io +import shutil +import subprocess +import tarfile +from pathlib import Path +from urllib.request import urlopen + +from loguru import logger + +from ..models.parametrization import GromacsProteinParameterSet, Pdb2gmxConfig + +_DISULFIDE_CUTOFF_ANGSTROM = 2.2 + +FORCEFIELD_REGISTRY: dict[str, dict[str, str]] = { + "charmm36m": { + "url": "http://mackerell.umaryland.edu/download.php?filename=CHARMM_ff_params_files/charmm36-feb2026_cgenff-5.0.ff.tgz", + "dirname": "charmm36-feb2026_cgenff-5.0.ff", + "description": "CHARMM36m protein + CGenFF 5.0 (Feb 2026)", + }, + "charmm36m-ljpme": { + "url": "http://mackerell.umaryland.edu/download.php?filename=CHARMM_ff_params_files/charmm36-feb2026_ljpme_cgenff-5.0.ff.tgz", + "dirname": "charmm36-feb2026_ljpme_cgenff-5.0.ff", + "description": "CHARMM36m + CGenFF 5.0, LJ-PME variant (Feb 2026)", + }, +} + + +def get_forcefield_dir() -> Path: + """Return the directory where downloaded force fields are stored.""" + from ..settings import settings # noqa: PLC0415 + + return settings.gromacs_forcefield_dir + + +def get_gromacs_env(base_env: dict[str, str] | None = None) -> dict[str, str]: + """Return an environment with configured GROMACS force fields on GMXLIB.""" + from ..settings import settings # noqa: PLC0415 + + return settings.gromacs_env(base_env) + + +def _resolve_protonation_residue_name(residue_name: str, forcefield: str | None) -> str: + """Return the residue name to write into the PDB for a protonation override.""" + residue_name = residue_name.upper() + if forcefield and forcefield.lower().startswith("charmm"): + residue_name = { + "HID": "HSD", + "HIE": "HSE", + "HIP": "HSP", + }.get(residue_name, residue_name) + + if len(residue_name) != 3: + raise ValueError( + f"Protonation residue name '{residue_name}' cannot be written to PDB " + "without shifting columns. Use a 3-character residue name supported by " + "the selected force field." + ) + return residue_name + + +def _distance(coords_a: tuple[float, float, float], coords_b: tuple[float, float, float]) -> float: + return sum((a - b) ** 2 for a, b in zip(coords_a, coords_b, strict=True)) ** 0.5 + + +def _read_cysteine_sg_atoms(pdb_path: Path) -> dict[int, tuple[float, float, float]]: + """Read cysteine SG atom coordinates keyed by residue id.""" + atoms: dict[int, tuple[float, float, float]] = {} + duplicate_resids = set() + + with open(pdb_path) as f: + for line in f: + if not line.startswith(("ATOM", "HETATM")) or len(line) < 54: + continue + + atom_name = line[12:16].strip() + resname = line[17:20].strip() + if atom_name != "SG" or resname not in {"CYS", "CYM", "CYX"}: + continue + + resid = int(line[22:26].strip()) + coords = ( + float(line[30:38].strip()), + float(line[38:46].strip()), + float(line[46:54].strip()), + ) + if resid in atoms: + duplicate_resids.add(resid) + atoms[resid] = coords + + if duplicate_resids: + raise ValueError( + "Disulfide bond overrides use residue ids only and cannot disambiguate " + f"duplicate cysteine residue ids: {sorted(duplicate_resids)}." + ) + return atoms + + +def _build_disulfide_prompt_input( + pdb_path: Path, + disulfide_bonds: list[tuple[int, int]], + cutoff_angstrom: float = _DISULFIDE_CUTOFF_ANGSTROM, +) -> str: + """Build deterministic yes/no input for pdb2gmx disulfide prompts.""" + requested = {tuple(sorted(pair)) for pair in disulfide_bonds} + for pair in requested: + if pair[0] == pair[1]: + raise ValueError(f"Invalid disulfide bond with identical residues: {pair}") + + sg_atoms = _read_cysteine_sg_atoms(pdb_path) + missing_resids = sorted( + {resid for pair in requested for resid in pair if resid not in sg_atoms} + ) + if missing_resids: + raise ValueError( + f"Requested disulfide residues are not cysteine SG atoms in {pdb_path}: " + f"{missing_resids}" + ) + + candidate_pairs = [] + resids = sorted(sg_atoms) + for i, resid_a in enumerate(resids): + for resid_b in resids[i + 1 :]: + if _distance(sg_atoms[resid_a], sg_atoms[resid_b]) <= cutoff_angstrom: + candidate_pairs.append((resid_a, resid_b)) + + missing_pairs = sorted(requested - set(candidate_pairs)) + if missing_pairs: + raise ValueError( + "Requested disulfide bonds were not detected as close CYS SG pairs " + f"within {cutoff_angstrom:.1f} A in {pdb_path}: {missing_pairs}" + ) + + answers = ["y" if pair in requested else "n" for pair in candidate_pairs] + logger.info(f"pdb2gmx disulfide candidates: {candidate_pairs}; requested: {sorted(requested)}") + return "\n".join(answers) + "\n" + + +def _apply_protonation_states( + pdb_path: Path, + protonation_states: dict[str, str], + output_dir: Path, + forcefield: str | None = None, +) -> Path: + """Apply protonation state overrides by renaming residues in the PDB. + + pdb2gmx selects protonation states based on residue names. This function renames + matching residues in the PDB so pdb2gmx applies the requested 3-character residue + names without interactive prompts. Common AMBER histidine names are translated to + CHARMM names when a CHARMM force field is selected. + + Parameters + ---------- + pdb_path : Path + Input PDB file (absolute path). + protonation_states : dict[str, str] + Mapping of "RESNAME" to target residue name, e.g. + {"HIS15": "HID", "GLU35": "GLH"}. + output_dir : Path + Directory for the modified PDB. + forcefield : str | None + Selected pdb2gmx force field, used for force-field-specific aliases. + + Returns + ------- + Path + Path to the modified PDB file. + + """ + import re + + output_pdb = output_dir / "protonation_adjusted.pdb" + + # Parse overrides: "HIS15" -> (original_resname_prefix="HIS", resid=15, new_name="HID") + overrides = [] + for key, new_name in protonation_states.items(): + match = re.fullmatch(r"([A-Za-z]+)(\d+)", key) + if not match: + raise ValueError( + f"Invalid protonation state key '{key}'. " + "Expected format: RESNAME, e.g. 'HIS15'." + ) + resname_prefix = match.group(1).upper() + resid = int(match.group(2)) + overrides.append( + (resname_prefix, resid, _resolve_protonation_residue_name(new_name, forcefield)) + ) + + with open(pdb_path) as f_in, open(output_pdb, "w") as f_out: + for line in f_in: + output_line = line + if output_line.startswith(("ATOM", "HETATM")) and len(output_line) >= 26: + current_resname = output_line[17:20].strip() + try: + current_resid = int(output_line[22:26].strip()) + except ValueError: + f_out.write(output_line) + continue + + for prefix, target_resid, new_name in overrides: + if current_resname.startswith(prefix) and current_resid == target_resid: + padded = f"{new_name:<3s}" + output_line = output_line[:17] + padded + output_line[20:] + break + f_out.write(output_line) + + logger.info(f"Protonation states applied: {protonation_states}") + return output_pdb + + +def check_gmx_available() -> Path: + """Verify that the GROMACS gmx binary is available. + + Checks the configured path from settings first, then falls back to PATH. + + Returns + ------- + Path + Path to the gmx binary. + + Raises + ------ + RuntimeError + If gmx is not found. + + """ + from ..settings import settings # noqa: PLC0415 + + configured = settings.gromacs_gmx_path + if configured is not None: + if configured.is_file(): + return configured + raise RuntimeError( + f"Configured GROMACS path '{configured}' does not exist. " + "Check [gromacs] GMX_PATH in your mdfactory config." + ) + + gmx_path = shutil.which("gmx") + if gmx_path is None: + raise RuntimeError( + "GROMACS 'gmx' binary not found on PATH. " + "Either set [gromacs] GMX_PATH in mdfactory config, " + "or ensure 'gmx' is on PATH (e.g. 'module load gromacs')." + ) + return Path(gmx_path) + + +def download_forcefield(name: str) -> Path: + """Download a force field from the registry to the local data directory. + + Parameters + ---------- + name : str + Registry key (e.g. "charmm36m"). + + Returns + ------- + Path + Path to the extracted force field directory. + + Raises + ------ + ValueError + If the name is not in the registry. + + """ + if name not in FORCEFIELD_REGISTRY: + raise ValueError( + f"Force field '{name}' is not in the download registry. " + f"Available: {sorted(FORCEFIELD_REGISTRY.keys())}." + ) + + entry = FORCEFIELD_REGISTRY[name] + ff_dir = get_forcefield_dir() + ff_dir.mkdir(parents=True, exist_ok=True) + + target = ff_dir / entry["dirname"] + if target.is_dir(): + logger.info(f"Force field '{name}' already present at {target}") + return target + + logger.info(f"Downloading force field '{name}' from {entry['url']}...") + response = urlopen(entry["url"], timeout=120) # noqa: S310 + data = io.BytesIO(response.read()) + + with tarfile.open(fileobj=data, mode="r:gz") as tar: + tar.extractall(path=ff_dir, filter="data") + + if not target.is_dir(): + raise RuntimeError( + f"Download succeeded but expected directory '{entry['dirname']}' " + f"not found in {ff_dir}. Tarball may have unexpected structure." + ) + + logger.info(f"Force field '{name}' installed to {target}") + return target + + +def download_all_forcefields() -> list[Path]: + """Download all force fields in the registry. Idempotent.""" + paths = [] + for name in FORCEFIELD_REGISTRY: + paths.append(download_forcefield(name)) + return paths + + +def _get_gmx_search_paths() -> list[Path]: + """Return the directories GROMACS searches for force fields.""" + search_paths: list[Path] = [] + + gmx_bin = str(check_gmx_available()) + result = subprocess.run( + [gmx_bin, "--version"], + text=True, + capture_output=True, + timeout=10, + check=False, + ) + for line in result.stdout.splitlines(): + if "Data prefix" in line: + data_prefix = Path(line.split(":", 1)[1].strip()) + top_dir = data_prefix / "share" / "gromacs" / "top" + if top_dir.is_dir(): + search_paths.append(top_dir) + break + + gmxlib = get_gromacs_env().get("GMXLIB", "") + for p in gmxlib.split(":"): + if p and Path(p).is_dir(): + search_paths.append(Path(p)) + + search_paths.append(Path.cwd()) + return search_paths + + +def resolve_forcefield(forcefield: str) -> str: + """Resolve a friendly force field name to the actual directory stem. + + If the name is a registry key (e.g. "charmm36m"), returns the actual + directory name (e.g. "charmm36-feb2026_cgenff-5.0"). Otherwise returns + the name unchanged (for built-in FFs like "charmm27"). + + """ + if forcefield in FORCEFIELD_REGISTRY: + return FORCEFIELD_REGISTRY[forcefield]["dirname"].removesuffix(".ff") + return forcefield + + +def check_forcefield_available(forcefield: str) -> None: + """Verify that the requested force field is findable by GROMACS. + + Parameters + ---------- + forcefield : str + Force field name as passed to ``gmx pdb2gmx -ff``, or a registry + key (e.g. "charmm36m"). + + Raises + ------ + ValueError + If the force field is not found and not in the registry. + + """ + # Resolve friendly name to actual dirname + resolved = resolve_forcefield(forcefield) + ff_dirname = f"{resolved}.ff" + + search_paths = _get_gmx_search_paths() + + for search_dir in search_paths: + if (search_dir / ff_dirname).is_dir(): + return + + available = set() + for search_dir in search_paths: + if search_dir.is_dir(): + for entry in search_dir.iterdir(): + if entry.is_dir() and entry.name.endswith(".ff"): + available.add(entry.stem) + + raise ValueError( + f"Force field '{forcefield}' not found. " + f"Searched: {[str(p) for p in search_paths]}.\n" + f"Available force fields: {sorted(available)}.\n" + f"Downloadable: {sorted(FORCEFIELD_REGISTRY.keys())}.\n" + "Install the force field to a search path, set [gromacs] FORCEFIELD_DIR, " + "or download registered force fields with `mdfactory config init`." + ) + + +def clean_pdb(pdb_path: Path, output_path: Path) -> Path: + """Clean a PDB file using PDBFixer: remove heterogens, add missing heavy atoms. + + Parameters + ---------- + pdb_path : Path + Input PDB file. + output_path : Path + Where to write the cleaned PDB. + + Returns + ------- + Path + Path to the cleaned PDB file. + + """ + from pdbfixer import PDBFixer # noqa: PLC0415 + + fixer = PDBFixer(filename=str(pdb_path)) + fixer.removeHeterogens(keepWater=False) + fixer.findMissingResidues() + fixer.findMissingAtoms() + fixer.addMissingAtoms() + + from openmm.app import PDBFile # noqa: PLC0415 + + with open(output_path, "w") as f: + PDBFile.writeFile(fixer.topology, fixer.positions, f) + + logger.info(f"PDB cleaned: {pdb_path} -> {output_path}") + return output_path + + +def run_pdb2gmx( + pdb_path: Path, + config: Pdb2gmxConfig, + disulfide_bonds: list[tuple[int, int]], + protonation_states: dict[str, str], + output_dir: Path, +) -> GromacsProteinParameterSet: + """Run gmx pdb2gmx to generate GROMACS topology from a protein PDB. + + Parameters + ---------- + pdb_path : Path + Cleaned PDB file. + config : Pdb2gmxConfig + Force field and water model configuration. + disulfide_bonds : list[tuple[int, int]] + Residue ID pairs for disulfide bonds. + protonation_states : dict[str, str] + Residue-specific protonation state overrides. + output_dir : Path + Directory for output files. + + Returns + ------- + GromacsProteinParameterSet + Paths to generated topology, structure, and position restraint files. + + """ + gmx_bin = str(check_gmx_available()) + check_forcefield_available(config.forcefield) + resolved_ff = resolve_forcefield(config.forcefield) + + output_dir.mkdir(parents=True, exist_ok=True) + + # Resolve all paths to absolute to avoid cwd confusion with subprocess + pdb_path = pdb_path.resolve() + output_dir = output_dir.resolve() + + structure_file = output_dir / "processed.gro" + topology_file = output_dir / "topol.top" + posre_file = output_dir / "posre.itp" + + cmd = [ + gmx_bin, + "pdb2gmx", + "-f", + str(pdb_path), + "-o", + str(structure_file), + "-p", + str(topology_file), + "-i", + str(posre_file), + "-ff", + resolved_ff, + "-water", + config.water_model, + ] + + if config.ignh: + cmd.append("-ignh") + + if config.merge_all: + cmd.extend(["-merge", "all"]) + + # Apply protonation state overrides by renaming residues in the PDB + # before pdb2gmx processes it (standard approach for CHARMM/AMBER FFs) + if protonation_states: + pdb_path = _apply_protonation_states( + pdb_path, protonation_states, output_dir, forcefield=config.forcefield + ) + cmd[cmd.index("-f") + 1] = str(pdb_path) + + pdb2gmx_input = "" + if disulfide_bonds: + cmd.append("-ss") + pdb2gmx_input = _build_disulfide_prompt_input(pdb_path, disulfide_bonds) + + logger.info(f"Running pdb2gmx: {' '.join(cmd)}") + + result = subprocess.run( + cmd, + cwd=str(output_dir), + text=True, + input=pdb2gmx_input, + capture_output=True, + timeout=120, + env=get_gromacs_env(), + check=False, + ) + + if result.returncode != 0: + raise RuntimeError( + f"gmx pdb2gmx failed (exit code {result.returncode}).\n" + f"stdout:\n{result.stdout}\n" + f"stderr:\n{result.stderr}" + ) + + logger.info("pdb2gmx completed successfully.") + + # Verify expected outputs exist + for path in [structure_file, topology_file, posre_file]: + if not path.is_file(): + raise FileNotFoundError( + f"Expected pdb2gmx output not found: {path}\n" + f"stdout:\n{result.stdout}\n" + f"stderr:\n{result.stderr}" + ) + + total_charge = extract_charge_from_topology(topology_file) + logger.info(f"Protein net charge from topology: {total_charge}") + + return GromacsProteinParameterSet( + topology_file=topology_file, + structure_file=structure_file, + position_restraint_file=posre_file, + forcefield=config.forcefield, + water_model=config.water_model, + total_charge=total_charge, + ) + + +def extract_charge_from_topology(top_path: Path) -> int: + """Parse the topology file to determine the net system charge. + + Reads the [ atoms ] section(s) and sums the charge column. + + Parameters + ---------- + top_path : Path + Path to the GROMACS .top file. + + Returns + ------- + int + Net formal charge (rounded to nearest integer). + + """ + total_charge = 0.0 + in_atoms = False + + with open(top_path) as f: + for line in f: + stripped = line.strip() + + if stripped.startswith("#include"): + # Follow includes only for local itp files (protein chains, + # position restraints), not force field library files + include_path = stripped.split('"')[1] if '"' in stripped else None + if include_path and ".ff/" not in include_path: + itp_path = top_path.parent / include_path + if itp_path.is_file(): + total_charge += _sum_charges_from_itp(itp_path) + continue + + if stripped.startswith("["): + section = stripped.strip("[] ").lower() + in_atoms = section == "atoms" + continue + + if in_atoms and stripped and not stripped.startswith(";"): + parts = stripped.split() + if len(parts) >= 7: + try: + total_charge += float(parts[6]) + except (ValueError, IndexError): + pass + + return round(total_charge) + + +def _sum_charges_from_itp(itp_path: Path) -> float: + """Sum charges from the [ atoms ] section of an .itp file.""" + total = 0.0 + in_atoms = False + + with open(itp_path) as f: + for line in f: + stripped = line.strip() + if stripped.startswith("["): + section = stripped.strip("[] ").lower() + in_atoms = section == "atoms" + continue + if in_atoms and stripped and not stripped.startswith(";"): + parts = stripped.split() + if len(parts) >= 7: + try: + total += float(parts[6]) + except (ValueError, IndexError): + pass + return total + + +def update_topology_molecules( + top_path: Path, n_water: int, num_na: int, num_cl: int, water_name: str = "SOL" +) -> None: + """Update the [ molecules ] section of a topology file after solvation/ionization. + + Appends water and ion molecule counts to the topology. + + Parameters + ---------- + top_path : Path + Path to the GROMACS .top file (modified in-place). + n_water : int + Number of water molecules. + num_na : int + Number of Na+ ions. + num_cl : int + Number of Cl- ions. + water_name : str + Residue name for water (default "SOL"). + + """ + with open(top_path, "a") as f: + if n_water > 0: + f.write(f"{water_name:<20s} {n_water}\n") + if num_na > 0: + f.write(f"{'NA':<20s} {num_na}\n") + if num_cl > 0: + f.write(f"{'CL':<20s} {num_cl}\n") + + logger.info(f"Topology updated: {n_water} water, {num_na} Na+, {num_cl} Cl-") + + +def validate_with_grompp(topology: Path, structure: Path, mdp: Path, cwd: Path) -> None: + """Run gmx grompp as a validation check for topology/coordinate consistency. + + Parameters + ---------- + topology : Path + Path to topology.top file. + structure : Path + Path to structure .gro file. + mdp : Path + Path to MDP file (e.g., em.mdp). + cwd : Path + Working directory for the command. + + Raises + ------ + RuntimeError + If grompp reports errors. + + """ + gmx_bin = str(check_gmx_available()) + tpr_out = cwd / "check.tpr" + cmd = [ + gmx_bin, + "grompp", + "-f", + str(mdp), + "-c", + str(structure), + "-p", + str(topology), + "-o", + str(tpr_out), + "-maxwarn", + "0", + ] + + result = subprocess.run( + cmd, + cwd=str(cwd), + text=True, + input="", + capture_output=True, + timeout=60, + env=get_gromacs_env(), + check=False, + ) + + # Clean up grompp artifacts + if tpr_out.is_file(): + tpr_out.unlink() + mdout_path = cwd / "mdout.mdp" + if mdout_path.is_file(): + mdout_path.unlink() + + if result.returncode != 0: + raise RuntimeError( + f"gmx grompp validation failed (exit code {result.returncode}).\n" + f"stdout:\n{result.stdout}\n" + f"stderr:\n{result.stderr}" + ) + + logger.info("grompp validation passed.") diff --git a/mdfactory/simulation/openmm_utils.py b/mdfactory/simulation/openmm_utils.py index 126df55..a180ecd 100644 --- a/mdfactory/simulation/openmm_utils.py +++ b/mdfactory/simulation/openmm_utils.py @@ -1,5 +1,5 @@ -# ABOUTME: OpenMM simulation utilities for box compression and equilibration -# ABOUTME: Provides volume convergence monitoring and density-targeted compression +# ABOUTME: OpenMM simulation utilities for box compression, equilibration, and relaxation +# ABOUTME: Provides convergence checks, density-targeted compression, and protein restraints """OpenMM simulation utilities for box compression and equilibration.""" import sys @@ -635,3 +635,121 @@ def compress_equilibrate_bilayer( plt.savefig("equilibration_analysis.png", dpi=300, bbox_inches="tight") return ret + + +def relax_with_protein_restraints( + u, + pdb, + top, + protein_indices, + restraint_k=1000.0, + steps=10000, +): + """Minimize and briefly equilibrate with protein atoms position-restrained. + + Runs a short NPT simulation at 1 bar with harmonic restraints on + the specified protein atom indices, allowing water and ions to relax + around the fixed protein structure. + + Parameters + ---------- + u : mda.Universe + Reference MDAnalysis Universe whose topology is preserved. + pdb : str or Path + Path to the input PDB file for OpenMM. + top : str or Path + Path to the GROMACS topology file. + protein_indices : list of int + 0-based atom indices to restrain (typically protein heavy atoms). + restraint_k : float, optional + Restraint force constant in kJ/mol/nm^2. Default is 1000.0. + steps : int, optional + Number of MD steps after minimization. Default is 10000. + + Returns + ------- + mda.Universe + Universe with relaxed solvent positions. Protein positions + remain at their restrained coordinates. + + """ + print("Loading GROMACS files for protein relaxation...") + gro = app.PDBFile(str(pdb)) + top_file = app.GromacsTopFile(str(top), periodicBoxVectors=gro.topology.getPeriodicBoxVectors()) + + print("Creating OpenMM system...") + system = top_file.createSystem( + nonbondedMethod=app.PME, + nonbondedCutoff=1.0 * unit.nanometer, + constraints=app.HBonds, + ) + + # Position restraints on protein atoms + restraint_force = mm.CustomExternalForce("k*((x-x0)^2+(y-y0)^2+(z-z0)^2)") + restraint_force.addGlobalParameter( + "k", restraint_k * unit.kilojoules_per_mole / unit.nanometer**2 + ) + restraint_force.addPerParticleParameter("x0") + restraint_force.addPerParticleParameter("y0") + restraint_force.addPerParticleParameter("z0") + + for idx in protein_indices: + pos = gro.positions[idx] + restraint_force.addParticle( + idx, + [ + pos[0].value_in_unit(unit.nanometer), + pos[1].value_in_unit(unit.nanometer), + pos[2].value_in_unit(unit.nanometer), + ], + ) + + system.addForce(restraint_force) + + # Isotropic barostat at 1 bar + barostat = mm.MonteCarloBarostat(1.0 * unit.bar, 300 * unit.kelvin) + system.addForce(barostat) + + integrator = mm.LangevinMiddleIntegrator( + 300 * unit.kelvin, 1 / unit.picosecond, 0.002 * unit.picoseconds + ) + + simulation = app.Simulation(top_file.topology, system, integrator) + simulation.context.setPositions(gro.positions) + + simulation.reporters.append( + app.StateDataReporter( + sys.stdout, + 1000, + step=True, + potentialEnergy=True, + temperature=True, + density=True, + volume=True, + ) + ) + + print("Minimizing energy (protein restrained)...") + simulation.minimizeEnergy() + + print(f"Running {steps} steps of NPT relaxation (protein restrained)...") + simulation.step(steps) + + positions = simulation.context.getState(getPositions=True).getPositions() + state = simulation.context.getState() + box_vectors = state.getPeriodicBoxVectors() + simulation.topology.setPeriodicBoxVectors(box_vectors) + + with open("relaxed_system.pdb", "w") as f: + app.PDBFile.writeFile(simulation.topology, positions, f, keepIds=True) + with open("relaxed_system.mmcif", "w") as f: + app.PDBxFile.writeFile(simulation.topology, positions, f, keepIds=True) + + u_tmp = mda.Universe(app.PDBxFile("relaxed_system.mmcif")) + ret = mda.Merge(u.atoms) + ret.atoms.positions = u_tmp.atoms.positions + ret.dimensions = u_tmp.dimensions + ret.atoms.positions = ret.atoms.wrap(compound="residues") + + print("Protein relaxation complete.") + return ret diff --git a/mdfactory/tests/test_proteinbox.py b/mdfactory/tests/test_proteinbox.py new file mode 100644 index 0000000..69023bb --- /dev/null +++ b/mdfactory/tests/test_proteinbox.py @@ -0,0 +1,379 @@ +# ABOUTME: Tests for proteinbox models, topology parsing, and YAML validation +# ABOUTME: Covers ProteinSpecies, ProteinBoxComposition, Pdb2gmxConfig, and BuildInput +"""Tests for proteinbox models, topology parsing, and YAML validation.""" + +import textwrap +from pathlib import Path + +import pytest +import yaml + +from mdfactory.models.composition import ProteinBoxComposition +from mdfactory.models.input import BuildInput +from mdfactory.models.parametrization import Pdb2gmxConfig +from mdfactory.models.species import ProteinSpecies +from mdfactory.setup.protein import ( + _apply_protonation_states, + _build_disulfide_prompt_input, + _sum_charges_from_itp, + check_forcefield_available, + run_pdb2gmx, + update_topology_molecules, +) + + +def _pdb_atom(serial, atom_name, resname, resid, x, y, z): + return ( + f"ATOM {serial:5d} {atom_name:<4s} {resname:>3s} A{resid:4d} " + f"{x:8.3f}{y:8.3f}{z:8.3f} 1.00 0.00\n" + ) + + +class TestProteinSpecies: + def test_basic_creation(self, tmp_path): + pdb = tmp_path / "test.pdb" + pdb.write_text("ATOM 1 N ALA A 1 0.000 0.000 0.000 1.00 0.00\n") + spec = ProteinSpecies(resname="LYZ", pdb_path=pdb) + assert spec.resname == "LYZ" + assert spec.count == 1 + assert spec.fraction == 1.0 + + def test_with_disulfides_and_protonation(self, tmp_path): + pdb = tmp_path / "test.pdb" + pdb.write_text("ATOM 1 N ALA A 1 0.000 0.000 0.000 1.00 0.00\n") + spec = ProteinSpecies( + resname="LYZ", + pdb_path=pdb, + disulfide_bonds=[(6, 127), (30, 115)], + protonation_states={"HIS15": "HIE", "GLU35": "GLH"}, + ) + assert len(spec.disulfide_bonds) == 2 + assert spec.disulfide_bonds[0] == (6, 127) + assert spec.protonation_states["HIS15"] == "HIE" + + def test_charge_not_precomputable(self, tmp_path): + pdb = tmp_path / "test.pdb" + pdb.write_text("ATOM 1 N ALA A 1 0.000 0.000 0.000 1.00 0.00\n") + spec = ProteinSpecies(resname="LYZ", pdb_path=pdb) + with pytest.raises(NotImplementedError, match="pdb2gmx"): + _ = spec.charge + + def test_resname_validation(self, tmp_path): + pdb = tmp_path / "test.pdb" + pdb.write_text("ATOM 1 N ALA A 1 0.000 0.000 0.000 1.00 0.00\n") + with pytest.raises(ValueError, match="Residue name must be less"): + ProteinSpecies(resname="TOOLONG", pdb_path=pdb) + + +class TestProteinBoxComposition: + def test_basic_creation(self, tmp_path): + pdb = tmp_path / "test.pdb" + pdb.write_text("ATOM 1 N ALA A 1 0.000 0.000 0.000 1.00 0.00\n") + comp = ProteinBoxComposition( + protein=ProteinSpecies(resname="LYZ", pdb_path=pdb), + box_padding=12.0, + ) + assert comp.box_padding == 12.0 + assert comp.ionization.neutralize is True + assert comp.ionization.concentration == 0.15 + + def test_custom_ionization(self, tmp_path): + pdb = tmp_path / "test.pdb" + pdb.write_text("ATOM 1 N ALA A 1 0.000 0.000 0.000 1.00 0.00\n") + comp = ProteinBoxComposition( + protein=ProteinSpecies(resname="LYZ", pdb_path=pdb), + ionization={"neutralize": True, "concentration": 0.10}, + ) + assert comp.ionization.concentration == 0.10 + + +class TestPdb2gmxConfig: + def test_defaults(self): + config = Pdb2gmxConfig() + assert config.forcefield == "charmm36m" + assert config.water_model == "tip3p" + assert config.ignh is True + assert config.merge_all is False + + def test_custom(self): + config = Pdb2gmxConfig(forcefield="amber99sb-ildn", water_model="spc") + assert config.forcefield == "amber99sb-ildn" + assert config.water_model == "spc" + + def test_frozen(self): + config = Pdb2gmxConfig() + with pytest.raises(Exception): + config.forcefield = "other" + + +class TestForceFieldCheck: + def test_available_forcefield_passes(self, tmp_path, monkeypatch): + (tmp_path / "charmm27.ff").mkdir() + monkeypatch.setattr( + "mdfactory.setup.protein._get_gmx_search_paths", + lambda: [tmp_path], + ) + check_forcefield_available("charmm27") + + def test_unavailable_forcefield_raises(self, tmp_path, monkeypatch): + monkeypatch.setattr( + "mdfactory.setup.protein._get_gmx_search_paths", + lambda: [tmp_path], + ) + with pytest.raises(ValueError, match="not found"): + check_forcefield_available("nonexistent_ff_xyz") + + def test_error_lists_available_forcefields(self, tmp_path, monkeypatch): + (tmp_path / "charmm27.ff").mkdir() + monkeypatch.setattr( + "mdfactory.setup.protein._get_gmx_search_paths", + lambda: [tmp_path], + ) + with pytest.raises(ValueError, match="charmm27") as exc_info: + check_forcefield_available("nonexistent_ff_xyz") + assert "Available force fields" in str(exc_info.value) + + def test_registered_forcefield_check_does_not_download(self, tmp_path, monkeypatch): + monkeypatch.setattr( + "mdfactory.setup.protein._get_gmx_search_paths", + lambda: [tmp_path], + ) + monkeypatch.setattr( + "mdfactory.setup.protein.download_forcefield", + lambda name: pytest.fail("check_forcefield_available should not download"), + ) + with pytest.raises(ValueError, match="Downloadable"): + check_forcefield_available("charmm36m") + + +class TestBuildInputProteinbox: + def test_yaml_roundtrip(self, tmp_path): + pdb = tmp_path / "test.pdb" + pdb.write_text("ATOM 1 N ALA A 1 0.000 0.000 0.000 1.00 0.00\n") + + yaml_str = f""" +simulation_type: proteinbox +engine: gromacs +parametrization: pdb2gmx +parametrization_config: + type: pdb2gmx + forcefield: charmm36m + water_model: tip3p + ignh: true +system: + protein: + resname: LYZ + count: 1 + pdb_path: {pdb} + disulfide_bonds: + - [6, 127] + - [30, 115] + protonation_states: + HIS15: HIE + box_padding: 12.0 + ionization: + neutralize: true + concentration: 0.15 +""" + data = yaml.safe_load(yaml_str) + inp = BuildInput(**data) + assert inp.simulation_type == "proteinbox" + assert isinstance(inp.system, ProteinBoxComposition) + assert isinstance(inp.parametrization_config, Pdb2gmxConfig) + assert inp.parametrization == "pdb2gmx" + + def test_default_parametrization_config(self, tmp_path): + pdb = tmp_path / "test.pdb" + pdb.write_text("ATOM 1 N ALA A 1 0.000 0.000 0.000 1.00 0.00\n") + + data = { + "simulation_type": "proteinbox", + "parametrization": "pdb2gmx", + "system": { + "protein": {"resname": "LYZ", "count": 1, "pdb_path": str(pdb)}, + }, + } + inp = BuildInput(**data) + assert isinstance(inp.parametrization_config, Pdb2gmxConfig) + assert inp.parametrization_config.forcefield == "charmm36m" + + +class TestTopologyParsing: + def test_sum_charges_from_itp(self, tmp_path): + itp = tmp_path / "protein.itp" + itp.write_text( + textwrap.dedent("""\ + [ moleculetype ] + Protein 3 + + [ atoms ] + ; nr type resnr residue atom cgnr charge mass + 1 NH3 1 LYS N 1 -0.300 14.007 + 2 CT 1 LYS CA 2 0.210 12.011 + 3 C 1 LYS C 3 0.510 12.011 + 4 O 1 LYS O 4 -0.510 15.999 + + [ bonds ] + 1 2 + """) + ) + charge = _sum_charges_from_itp(itp) + assert abs(charge - (-0.090)) < 1e-6 + + def test_update_topology_molecules(self, tmp_path): + top = tmp_path / "topol.top" + top.write_text( + textwrap.dedent("""\ + #include "charmm36m.ff/forcefield.itp" + #include "protein.itp" + + [ system ] + Protein in water + + [ molecules ] + Protein_chain_A 1 + """) + ) + update_topology_molecules(top, n_water=5000, num_na=10, num_cl=18) + content = top.read_text() + assert "SOL" in content + assert "5000" in content + assert "NA" in content + assert "10" in content + assert "CL" in content + assert "18" in content + + def test_apply_protonation_states(self, tmp_path): + pdb = tmp_path / "input.pdb" + pdb.write_text( + textwrap.dedent("""\ + ATOM 1 N HIS A 15 1.000 2.000 3.000 1.00 0.00 + ATOM 2 CA HIS A 15 1.500 2.500 3.500 1.00 0.00 + ATOM 3 N ALA A 16 2.000 3.000 4.000 1.00 0.00 + ATOM 4 N GLU A 35 3.000 4.000 5.000 1.00 0.00 + """) + ) + result = _apply_protonation_states(pdb, {"HIS15": "HIE", "GLU35": "GLH"}, tmp_path) + content = result.read_text() + # HIS at resid 15 should be renamed to HIE + assert "HIE" in content + # GLU at resid 35 should be renamed to GLH + assert "GLH" in content + # ALA at resid 16 should be unchanged + assert "ALA" in content + + def test_apply_protonation_states_translates_charmm_histidine_alias(self, tmp_path): + pdb = tmp_path / "input.pdb" + pdb.write_text(_pdb_atom(1, "N", "HIS", 15, 1.0, 2.0, 3.0)) + result = _apply_protonation_states(pdb, {"HIS15": "HIE"}, tmp_path, forcefield="charmm36m") + assert "HSE" in result.read_text() + + def test_apply_protonation_states_rejects_four_character_names(self, tmp_path): + pdb = tmp_path / "input.pdb" + pdb.write_text(_pdb_atom(1, "N", "GLU", 35, 1.0, 2.0, 3.0)) + with pytest.raises(ValueError, match="cannot be written to PDB"): + _apply_protonation_states(pdb, {"GLU35": "GLUP"}, tmp_path) + + def test_build_disulfide_prompt_input_answers_exact_requested_pairs(self, tmp_path): + pdb = tmp_path / "input.pdb" + pdb.write_text( + "".join( + [ + _pdb_atom(1, "SG", "CYS", 1, 0.0, 0.0, 0.0), + _pdb_atom(2, "SG", "CYS", 4, 2.0, 0.0, 0.0), + _pdb_atom(3, "SG", "CYS", 2, 10.0, 0.0, 0.0), + _pdb_atom(4, "SG", "CYS", 3, 12.0, 0.0, 0.0), + ] + ) + ) + assert _build_disulfide_prompt_input(pdb, [(2, 3)]) == "n\ny\n" + + def test_build_disulfide_prompt_input_rejects_missing_pair(self, tmp_path): + pdb = tmp_path / "input.pdb" + pdb.write_text( + "".join( + [ + _pdb_atom(1, "SG", "CYS", 6, 0.0, 0.0, 0.0), + _pdb_atom(2, "SG", "CYS", 127, 8.0, 0.0, 0.0), + ] + ) + ) + with pytest.raises(ValueError, match="not detected as close CYS SG pairs"): + _build_disulfide_prompt_input(pdb, [(6, 127)]) + + def test_run_pdb2gmx_passes_disulfide_answers_to_subprocess(self, tmp_path, monkeypatch): + pdb = tmp_path / "input.pdb" + pdb.write_text( + "".join( + [ + _pdb_atom(1, "SG", "CYS", 1, 0.0, 0.0, 0.0), + _pdb_atom(2, "SG", "CYS", 4, 2.0, 0.0, 0.0), + _pdb_atom(3, "SG", "CYS", 2, 10.0, 0.0, 0.0), + _pdb_atom(4, "SG", "CYS", 3, 12.0, 0.0, 0.0), + ] + ) + ) + calls = {} + + def fake_run(cmd, cwd, text, input, capture_output, timeout, env=None, check=None): + calls["cmd"] = cmd + calls["cwd"] = cwd + calls["input"] = input + calls["env"] = env + calls["check"] = check + output_dir = Path(cwd) + (output_dir / "processed.gro").write_text("mock gro\n") + (output_dir / "topol.top").write_text("[ atoms ]\n") + (output_dir / "posre.itp").write_text("mock posre\n") + + class Result: + returncode = 0 + stdout = "" + stderr = "" + + return Result() + + monkeypatch.setattr("mdfactory.setup.protein.subprocess.run", fake_run) + monkeypatch.setattr( + "mdfactory.setup.protein.check_gmx_available", lambda: Path("/mock/gmx") + ) + monkeypatch.setattr("mdfactory.setup.protein.check_forcefield_available", lambda ff: None) + monkeypatch.setattr( + "mdfactory.setup.protein.get_gromacs_env", + lambda: {"GMXLIB": "/mock/forcefields"}, + ) + + run_pdb2gmx( + pdb_path=pdb, + config=Pdb2gmxConfig(forcefield="amber99sb-ildn", water_model="tip3p"), + disulfide_bonds=[(2, 3)], + protonation_states={}, + output_dir=tmp_path / "pdb2gmx_output", + ) + + assert "-ss" in calls["cmd"] + assert calls["cmd"][:2] == ["/mock/gmx", "pdb2gmx"] + assert calls["input"] == "n\ny\n" + assert calls["cwd"] == str((tmp_path / "pdb2gmx_output").resolve()) + assert calls["env"] == {"GMXLIB": "/mock/forcefields"} + assert calls["check"] is False + + +class TestMetadata: + def test_proteinbox_metadata(self, tmp_path): + pdb = tmp_path / "test.pdb" + pdb.write_text("ATOM 1 N ALA A 1 0.000 0.000 0.000 1.00 0.00\n") + data = { + "simulation_type": "proteinbox", + "parametrization": "pdb2gmx", + "system": { + "protein": {"resname": "LYZ", "count": 1, "pdb_path": str(pdb)}, + "box_padding": 12.0, + }, + } + inp = BuildInput(**data) + meta = inp.metadata + assert meta["simulation_type"] == "proteinbox" + assert meta["total_count"] == 1 + assert "box_padding" in meta["system_specific"] + assert meta["system_specific"]["box_padding"] == 12.0 diff --git a/mdfactory/tests/test_settings.py b/mdfactory/tests/test_settings.py index 0eb0159..b918aac 100644 --- a/mdfactory/tests/test_settings.py +++ b/mdfactory/tests/test_settings.py @@ -25,6 +25,7 @@ def test_defaults_loaded(): assert "csv" in s.config assert "foundry" in s.config assert "cgenff" in s.config + assert "gromacs" in s.config assert "storage" in s.config assert s.config["database"]["TYPE"] == "sqlite" @@ -53,6 +54,25 @@ def test_cgenff_dir_default(): assert s.cgenff_dir is not None +def test_gromacs_env_prepends_forcefield_dir(tmp_path, monkeypatch): + """gromacs_env should add the configured force field directory to GMXLIB.""" + monkeypatch.setenv("MDFACTORY_DATA_DIR", str(tmp_path / "data")) + s = Settings() + env = s.gromacs_env({"GMXLIB": "/existing/path"}) + assert env["GMXLIB"].split(":") == [ + str(tmp_path / "data" / "forcefields"), + "/existing/path", + ] + + +def test_config_template_documents_gromacs_section(): + """The sample config should expose all user-facing GROMACS settings.""" + template = Path("config_templates/config.ini").read_text() + assert "[gromacs]" in template + assert "GMX_PATH =" in template + assert "FORCEFIELD_DIR =" in template + + def test_reload(): """reload() should re-read config from disk.""" s = Settings() diff --git a/mdfactory/tests/test_sync_config_local_paths.py b/mdfactory/tests/test_sync_config_local_paths.py index b09bef8..e4a3127 100644 --- a/mdfactory/tests/test_sync_config_local_paths.py +++ b/mdfactory/tests/test_sync_config_local_paths.py @@ -89,12 +89,16 @@ def fake_initialize(): ) # Mock questionary prompts in order: - # 1. confirm CGenFF -> False - # 2. text parameter store -> default - # 3. select backend -> "sqlite" - # 4. text RUN_DB_PATH -> "runs.db" - # 5. text ANALYSIS_DB_PATH -> "analysis.db" - # 6. confirm initialize -> True + # 1. confirm GROMACS -> True + # 2. text GMX_PATH -> "" + # 3. text FORCEFIELD_DIR -> default + # 4. confirm force field download -> False + # 5. confirm CGenFF -> False + # 6. text parameter store -> default + # 7. select backend -> "sqlite" + # 8. text RUN_DB_PATH -> "runs.db" + # 9. text ANALYSIS_DB_PATH -> "analysis.db" + # 10. confirm initialize -> True class FakeQuestion: def __init__(self, value): self._value = value @@ -102,8 +106,16 @@ def __init__(self, value): def ask(self): return self._value - answers_confirm = iter([False, True]) - answers_text = iter([str(tmp_path / "data" / "parameters"), "runs.db", "analysis.db"]) + answers_confirm = iter([True, False, False, True]) + answers_text = iter( + [ + "", + str(tmp_path / "data" / "forcefields"), + str(tmp_path / "data" / "parameters"), + "runs.db", + "analysis.db", + ] + ) answers_select = iter(["sqlite"]) monkeypatch.setattr( @@ -122,4 +134,10 @@ def ask(self): run_config_wizard() assert called["value"] is True - assert (tmp_path / "config.ini").exists() + config_path = tmp_path / "config.ini" + assert config_path.exists() + + config = configparser.ConfigParser() + config.read(config_path) + assert config["gromacs"]["GMX_PATH"] == "" + assert config["gromacs"]["FORCEFIELD_DIR"] == str(tmp_path / "data" / "forcefields") diff --git a/mdfactory/utils/sync_config.py b/mdfactory/utils/sync_config.py index 9792998..c077f0d 100644 --- a/mdfactory/utils/sync_config.py +++ b/mdfactory/utils/sync_config.py @@ -285,6 +285,23 @@ def run_config_wizard() -> None: logger.info("Using existing user config. No changes made.") return + # GROMACS setup (optional) + use_gromacs = questionary.confirm("Do you use GROMACS (pdb2gmx)?", default=True).ask() + gmx_path = "" + forcefield_dir = str(data_dir / "forcefields") + if use_gromacs: + gmx_path = questionary.text( + "Path to gmx binary (leave empty to use PATH):", + default="", + ).ask() + forcefield_dir = questionary.text( + "Force field download directory:", + default=str(data_dir / "forcefields"), + ).ask() + download_ffs = questionary.confirm( + "Download CHARMM36m force field now?", default=True + ).ask() + # CGenFF setup (optional) use_cgenff = questionary.confirm("Do you use CGenFF (SILCSBIO)?", default=False).ask() silcsbiodir = "" @@ -312,6 +329,8 @@ def run_config_wizard() -> None: for key, value in options.items(): config[section][key] = str(value) + config["gromacs"]["GMX_PATH"] = gmx_path + config["gromacs"]["FORCEFIELD_DIR"] = normalize_local_path(forcefield_dir) config["cgenff"]["SILCSBIODIR"] = silcsbiodir config["storage"]["PARAMETERS"] = normalize_local_path(param_dir) config["database"]["TYPE"] = backend @@ -330,6 +349,14 @@ def run_config_wizard() -> None: cfg.ensure_dirs() cfg.reload() + # Download force fields if requested + if use_gromacs and download_ffs: + from ..setup.protein import download_all_forcefields + + logger.info("Downloading registered force fields...") + download_all_forcefields() + logger.success("Force fields downloaded.") + init_default = backend in {"sqlite", "csv"} if not questionary.confirm( "Initialize databases now?", diff --git a/mdfactory/workflows.py b/mdfactory/workflows.py index 9cd4b34..e62ced2 100644 --- a/mdfactory/workflows.py +++ b/mdfactory/workflows.py @@ -5,7 +5,7 @@ from pathlib import Path from typing import Any -from .build import build_bilayer, build_lnp, build_mixedbox +from .build import build_bilayer, build_lnp, build_mixedbox, build_proteinbox from .models.input import BuildInput from .utils.utilities import load_yaml_file @@ -13,6 +13,7 @@ "mixedbox": build_mixedbox, "bilayer": build_bilayer, "lnp": build_lnp, + "proteinbox": build_proteinbox, }