Source code for amorphgen.pipeline.run_pipeline

"""
amorphgen.pipeline.run_pipeline
--------------------------------
Orchestrates the full melt-and-quench pipeline.

Usage
-----
    from amorphgen import MeltQuenchPipeline

    # Default MACE model
    pipe = MeltQuenchPipeline(
        input_file="POSCAR",
        work_dir="my_run",
        cfg_override={"model": "mace-mpa-0", "device": "cuda"},
    )
    final_atoms = pipe.run()

    )

    # CHGNet
    pipe = MeltQuenchPipeline(
        input_file="POSCAR",
        cfg_override={"model": "chgnet"},
    )

    # Custom fine-tuned MACE model
    pipe = MeltQuenchPipeline(
        input_file="POSCAR",
        cfg_override={"model_path": "/data/models/InO_finetuned.model"},
    )

Resuming from a checkpoint
--------------------------
    pipe.run(stages=[5, 6, 7], input_file="stage4_eq_high.xyz")
"""

import os
import time
import platform
from datetime import datetime

from ase.io import read

from . import opt_cell, melt_cell, equilibrate, quench, final_opt
from ..utils import get_calculator, merge_config
from ..configs import DEFAULT_CONFIG


[docs] class MeltQuenchPipeline: """ End-to-end melt-and-quench pipeline for amorphous structure generation. Parameters ---------- input_file : str Path to the starting crystalline structure (any ASE-readable format). work_dir : str Directory where all output files are written. Created if absent. cfg_override : dict, optional Any keys in DEFAULT_CONFIG to override, including: * ``"model"`` : foundation model short name (any backend) * ``"model_path"`` : path to local .model file (overrides model) * ``"device"`` : ``"cuda"`` or ``"cpu"`` * ``"eq_premelt"`` : ``{"ensemble": "NVT" or "NPT", ...}`` * ``"melt"`` : ``{"ensemble": "NVT" or "NPT", ...}`` * ``"quench"`` : ``{"ensemble": "NVT" or "NPT", ...}`` * ``"eq_high"`` : ``{"ensemble": "NVT" or "NPT", ...}`` * ``"eq_low"`` : ``{"ensemble": "NVT" or "NPT", ...}`` share_calc : bool If True, one calculator is shared across all stages. """ STAGE_NAMES = { 1: "Structure optimisation (crystalline)", 2: "Pre-melt equilibration (300 K)", 3: "Melt (heat ramp)", 4: "High-T equilibration", 5: "Quench (cooling ramp)", 6: "Low-T equilibration", 7: "Final optimisation (amorphous)", } # Checkpoint files written by each stage (checked in reverse order for resume) STAGE_CHECKPOINTS = { 1: "stage1_opt.xyz", 2: "stage2_eq.xyz", 3: "stage3_melted.xyz", 4: "stage4_eq.xyz", 5: "stage5_quenched.xyz", 6: "stage6_eq.xyz", 7: "stage7_opt.xyz", } def __init__(self, input_file: str, work_dir: str = "melt_quench_run", cfg_override: dict | None = None, share_calc: bool = True): self.input_file = input_file self.work_dir = work_dir self.cfg = merge_config(DEFAULT_CONFIG, cfg_override) self.share_calc = share_calc self._calc = None self._orig_dir = os.getcwd() os.makedirs(work_dir, exist_ok=True) # Handle legacy "mace_model" key → "model" if self.cfg.get("mace_model") and not self.cfg.get("model"): self.cfg["model"] = self.cfg["mace_model"] # ───────────────────────────────────────────────────────────────────────── def _get_calc(self): """Build or return the shared calculator.""" if self._calc is None or not self.share_calc: device = self.cfg.get("device", "cuda") if device == "auto": import torch device = "cuda" if torch.cuda.is_available() else "cpu" calc_kwargs = {} if self.cfg.get("classical_params"): calc_kwargs["classical_params"] = self.cfg["classical_params"] self._calc = get_calculator( model=self.cfg.get("model", "mace-mpa-0"), device=device, model_path=self.cfg.get("model_path"), **calc_kwargs, ) return self._calc # ───────────────────────────────────────────────────────────────────────── def _find_resume_point(self, stages: list[int]) -> tuple[list[int], str]: """ Scan work_dir for completed stage checkpoints and return the remaining stages and the input file to resume from. Parameters ---------- stages : list of int The originally requested stages. Returns ------- remaining_stages : list of int Stages that still need to run. resume_input : str Path to the checkpoint file to resume from, or the original input_file if no checkpoints are found. """ resume_input = self.input_file remaining_stages = list(stages) # Walk stages in order; if a checkpoint exists, advance past it for s in stages: checkpoint = self.STAGE_CHECKPOINTS.get(s) if checkpoint is None: break checkpoint_path = os.path.join(self.work_dir, checkpoint) if os.path.isfile(checkpoint_path): resume_input = checkpoint_path remaining_stages.remove(s) else: break # first missing checkpoint → start here return remaining_stages, resume_input # ─────────────────────────────────────────────────────────────────────────
[docs] def run(self, stages: list[int] | None = None, input_file: str | None = None, resume: bool = False) -> object: """ Execute the pipeline. Parameters ---------- stages : list of int, optional Which stages to run (default: all, i.e. ``[1, 2, 3, 4, 5, 6, 7]``). input_file : str, optional Override the input structure file (useful for resuming from a mid-pipeline checkpoint). resume : bool If True, scan work_dir for completed stage checkpoints and skip stages that already have output files. Automatically determines which stage to resume from and which input file to use. Returns ------- ase.Atoms The final optimised amorphous structure. """ if stages is None: stages = [1, 2, 3, 4, 5, 6, 7] if input_file is None: input_file = self.input_file if resume: remaining, resume_input = self._find_resume_point(stages) if not remaining: print(f" All stages already completed in {self.work_dir}/") final_checkpoint = os.path.join( self.work_dir, self.STAGE_CHECKPOINTS[stages[-1]], ) return read(final_checkpoint) if remaining != stages: skipped = [s for s in stages if s not in remaining] print(f" Resuming: skipping completed stages {skipped}") print(f" Starting from stage {remaining[0]} " f"(input: {os.path.basename(resume_input)})") input_file = resume_input stages = remaining atoms = read(input_file) calc = self._get_calc() atoms.calc = calc model_name = self.cfg.get("model", "mace-mpa-0") model_path = self.cfg.get("model_path") model_display = model_path if model_path else model_name device = self.cfg.get("device", "cuda") n_atoms = len(atoms) formula = atoms.get_chemical_formula(mode="hill") bar = "=" * 65 print(f"\n{bar}") from .. import __version__ print(f" AmorphGen v{__version__} - Melt-and-Quench Pipeline") from ..utils.common import compute_density_gcm3 density = compute_density_gcm3(atoms) print(f" Model: {model_display}") print(f" Device: {device}") print(f" Input: {input_file}") print(f" System: {formula} ({n_atoms} atoms)") print(f" Density: {density:.2f} g/cm3") print(f" Stages: {stages}") print(f" Output: {self.work_dir}/") print(f"{bar}\n") os.chdir(self.work_dir) t0 = time.time() stage_timings = [] try: for s in stages: name = self.STAGE_NAMES.get(s, f"Stage {s}") print(f"\n{'-' * 65}") print(f" Stage {s}: {name}") print(f"{'-' * 65}\n") t_stage = time.time() if s == 1: atoms = opt_cell.run(atoms, self.cfg, calc) elif s == 2: atoms = equilibrate.run(atoms, self.cfg, calc, stage="premelt") elif s == 3: atoms = melt_cell.run(atoms, self.cfg, calc) elif s == 4: atoms = equilibrate.run(atoms, self.cfg, calc, stage="high") elif s == 5: atoms = quench.run(atoms, self.cfg, calc) elif s == 6: atoms = equilibrate.run(atoms, self.cfg, calc, stage="low") elif s == 7: atoms = final_opt.run(atoms, self.cfg, calc) else: print(f" WARNING: Unknown stage {s} - skipping.") continue dt = time.time() - t_stage stage_timings.append((s, name, dt)) d = compute_density_gcm3(atoms) print(f" [Stage {s} completed in {dt:.1f} s ({dt/60:.1f} min) " f"| density={d:.2f} g/cm3]") finally: os.chdir(self._orig_dir) elapsed = time.time() - t0 # Print summary print(f"\n{bar}") print(f" Pipeline complete ({elapsed / 60:.1f} min)") print(f" Output directory: {self.work_dir}/") print(f"{bar}\n") # Write pipeline summary log self._write_summary_log( input_file, formula, n_atoms, model_display, device, stages, stage_timings, elapsed ) return atoms
def _write_summary_log(self, input_file, formula, n_atoms, model_display, device, stages, stage_timings, total_elapsed): """Write a pipeline_summary.log file with timing and config.""" logfile = os.path.join(self.work_dir, "pipeline_summary.log") bar = "=" * 65 with open(logfile, "w") as f: f.write(f"{bar}\n") f.write(f" AmorphGen — Pipeline Summary\n") f.write(f" Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") f.write(f" Platform: {platform.platform()}\n") f.write(f" Python: {platform.python_version()}\n") f.write(f"{bar}\n\n") f.write(f" System: {formula} ({n_atoms} atoms)\n") f.write(f" Model: {model_display}\n") f.write(f" Device: {device}\n") f.write(f" Input: {input_file}\n") f.write(f" Output: {self.work_dir}/\n") f.write(f" Stages: {stages}\n\n") # Stage timings f.write(f" {'Stage':<45} {'Time (s)':>10} {'Time (min)':>12}\n") f.write(f" {'-'*69}\n") for s, name, dt in stage_timings: f.write(f" {s}. {name:<42} {dt:>10.1f} {dt/60:>11.1f}\n") f.write(f" {'-'*69}\n") f.write(f" {'Total':<45} {total_elapsed:>10.1f} " f"{total_elapsed/60:>11.1f}\n\n") # Per-atom timing if n_atoms > 0: f.write(f" Per-atom total: {total_elapsed/n_atoms:.2f} s/atom\n") for s, name, dt in stage_timings: f.write(f" Per-atom stage {s}: {dt/n_atoms:.2f} s/atom\n") f.write(f"\n") # Key config parameters f.write(f" Configuration:\n") for key in ["opt", "eq_premelt", "melt", "eq_high", "quench", "eq_low"]: if key in self.cfg: f.write(f" {key}:\n") for k, v in self.cfg[key].items(): f.write(f" {k}: {v}\n") f.write(f"\n{bar}\n") print(f" Summary log: {logfile}")