Source code for amorphgen.pipeline.random_gen

"""
amorphgen.pipeline.random_gen
------------------------------
Generate amorphous structures by random atom placement with
minimum-separation constraints, then optionally relax with a
foundation model.

This provides an alternative to the melt-and-quench route: instead of
melting a crystal and cooling it, we place atoms randomly inside a box
subject to pairwise distance constraints, then optimise the structure.

Data tables (Shannon ionic radii, metallic radii, electronegativities,
element classification) and helper functions (bond classification,
minsep calculation, density estimation) are in amorphgen.utils.radii.
"""

from __future__ import annotations

import logging
import os
import importlib
import time
import numpy as np
from ase import Atoms
from ase.io import write
from ase.data import atomic_masses, atomic_numbers

from ..utils.radii import (
    # Data tables (re-exported for backward compatibility)
    SHANNON_IONIC_RADII, METALLIC_RADII, PAULING_EN,
    NONMETALS, METALLOIDS, ELEMENTAL_DENSITIES, SCALE_FACTORS,
    # Functions (aliased with underscore for internal use)
    classify_bond as _classify_bond,
    get_ionic_radius as _get_ionic_radius,
    get_metallic_radius as _get_metallic_radius,
    get_effective_radius as _get_effective_radius,
    default_minsep as _default_minsep,
    estimate_density as _estimate_density,
    estimate_cell_length as _estimate_cell_length,
    auto_target_cn as _auto_target_cn,
    format_auto_derive_summary as _format_auto_derive_summary,
)

logger = logging.getLogger(__name__)


# ==============================================================================
# Internal helpers
# ==============================================================================

def _get_minsep(s1: str, s2: str, minsep: dict) -> float:
    """Look up the minimum separation for a pair of species."""
    key1 = f"{s1}-{s2}"
    key2 = f"{s2}-{s1}"
    return minsep.get(key1, minsep.get(key2, 1.5))


# ==============================================================================
# Optimizer and cell filter helpers
# ==============================================================================

def _get_optimizer_class(name: str):
    """Import and return an ASE optimizer class by name."""
    optimizers = {
        "LBFGS":          ("ase.optimize", "LBFGS"),
        "FIRE":           ("ase.optimize", "FIRE"),
        "BFGSLineSearch": ("ase.optimize", "BFGSLineSearch"),
        "BFGS":           ("ase.optimize", "BFGS"),
        "MDMin":          ("ase.optimize", "MDMin"),
    }
    if name not in optimizers:
        raise ValueError(f"Unknown optimizer '{name}'. Choose from: {', '.join(optimizers)}")
    module_path, cls_name = optimizers[name]
    module = importlib.import_module(module_path)
    return getattr(module, cls_name)


def _build_cell_filter(atoms, cell_filter: str):
    """Wrap atoms in the requested cell filter for optimisation."""
    if cell_filter == "none" or cell_filter is None:
        return atoms
    elif cell_filter == "cubic":
        from ase.filters import ExpCellFilter
        return ExpCellFilter(atoms, hydrostatic_strain=True)
    elif cell_filter == "ExpCellFilter":
        from ase.filters import ExpCellFilter
        return ExpCellFilter(atoms)
    elif cell_filter == "StrainFilter":
        from ase.filters import StrainFilter
        return StrainFilter(atoms)
    elif cell_filter == "UnitCellFilter":
        from ase.filters import UnitCellFilter
        return UnitCellFilter(atoms)
    else:
        # Default: FrechetCellFilter (better convergence for non-cubic)
        from ase.filters import FrechetCellFilter
        return FrechetCellFilter(atoms)


# ==============================================================================
# Coordination-aware placement helpers
# ==============================================================================

def _get_dmax(s1: str, s2: str, dmax: dict) -> float:
    """Look up the maximum bond distance for a pair of species."""
    key1 = f"{s1}-{s2}"
    key2 = f"{s2}-{s1}"
    return dmax.get(key1, dmax.get(key2, 0.0))


def _auto_dmax(minsep: dict, target_cn: dict, factor: float = 1.5) -> dict:
    """
    Auto-generate dmax from minsep for bonding pairs.

    A bond is defined as minsep <= d <= dmax, so dmax = minsep * factor
    gives a reasonable bonding shell width.

    Coverage:
    - Ionic bonds (M-O, M-Cl): dmax = minsep * 1.5 (primary bonding)
    - Covalent cross-element (Si-Ge): dmax = minsep * 1.5
    - Metallic bonds (M-M): dmax = minsep * 1.3 (only when M-M is
      the primary bond, i.e. alloys/pure metals with no anions)
    - Pure elements (Si, Ge, Sn, etc.): dmax = minsep * 1.2
    """
    dmax = {}
    cn_elements = set(target_cn.keys())

    # Check if pure element or alloy (no anion present)
    all_elements = set()
    for pair in minsep:
        s1, s2 = pair.split("-")
        all_elements.add(s1)
        all_elements.add(s2)
    has_anion = any(s in NONMETALS for s in all_elements)
    is_pure = all(p.split("-")[0] == p.split("-")[1] for p in minsep)

    for pair, dist in minsep.items():
        s1, s2 = pair.split("-")

        if s1 == s2:
            # Pure elements: tight dmax so atoms are placed close enough
            # for MLIP to relax into proper bonds
            if is_pure and s1 in cn_elements:
                dmax[pair] = dist * 1.2
            continue

        # At least one element must have a target CN
        if s1 not in cn_elements and s2 not in cn_elements:
            continue

        bond_type = _classify_bond(s1, s2)
        if bond_type == "ionic" or bond_type == "covalent":
            # Primary bonding (M-O, M-Cl, Si-Ge)
            dmax[pair] = dist * factor
        elif bond_type == "metallic" and not has_anion:
            # M-M bonds in alloys/pure metals (no anion context)
            # Don't add M-M dmax in ionic compounds (In-In in In2O3)
            # because it creates conflicting CN constraints
            dmax[pair] = dist * factor

    return dmax


def _update_cn_array(cn_array: np.ndarray, new_idx: int,
                     positions: np.ndarray, n_placed: int,
                     placed_type_idx: np.ndarray, sym_idx: int,
                     dmax_table: np.ndarray, L: float, pbc: bool):
    """Incrementally update the CN array after placing a new atom (vectorized)."""
    pos_new = positions[new_idx]

    # Build indices of all OTHER placed atoms
    others = np.arange(n_placed)
    others = others[others != new_idx]
    if len(others) == 0:
        cn_array[new_idx] = 0
        return

    # Vectorized distance computation (squared — avoid sqrt)
    d_vec = pos_new - positions[others]
    if pbc:
        d_vec -= L * np.round(d_vec / L)
    d_sq = np.sum(d_vec * d_vec, axis=1)

    # Lookup dmax^2 from precomputed table
    dm_sq = dmax_table[placed_type_idx[others], sym_idx]

    # Find bonded neighbours: dmax > 0 and dist^2 <= dmax^2
    bonded = (dm_sq > 0) & (d_sq <= dm_sq)

    cn_array[new_idx] = int(np.sum(bonded))
    cn_array[others[bonded]] += 1


def _repair_undercoordination(
    positions: np.ndarray,
    n_placed: int,
    placed_type_idx: np.ndarray,
    cn_array: np.ndarray,
    target_cn_arr: np.ndarray,
    cn_tolerance: int,
    minsep_sq_table: np.ndarray,
    dmax_sq_table: np.ndarray,
    L: float,
    pbc: bool,
    max_iter: int,
    rng: np.random.Generator,
) -> int:
    """**Experimental** post-placement repair pass for under-coordinated atoms.

    Greedy single-atom relocation: each iteration picks one
    under-coordinated atom and proposes a new position inside the bonding
    shell of another under-coordinated atom; the move is accepted if it
    reduces the total under-coord count without violating minsep or
    over-coordinating any neighbour.  Iterates until all atoms reach
    target CN, or ``max_iter`` proposals are exhausted.

    This addresses a fundamental limitation of one-pass greedy
    placement: once an atom is placed it stays put, so atoms placed
    "too late" can end up under-coordinated even when geometrically a
    better arrangement exists.  Repair imitates (poorly) what an MD
    anneal would do continuously, but at the cost of a sequential
    Monte-Carlo-style loop.  Default off because:
      * doesn't beat the hybrid workflow in quality
      * adds 5-30 s of placement time per structure
      * not a substitute for MD on tetrahedral semiconductors

    Returns the final number of under-coordinated atoms.
    """
    def _under_count(cn):
        return int(np.sum(cn[:n_placed] < target_cn_arr[placed_type_idx[:n_placed]]))

    # Bond-length sweet spot per type-pair: midpoint of (sqrt(minsep), sqrt(dmax)).
    minsep_table = np.sqrt(minsep_sq_table)
    dmax_table = np.sqrt(dmax_sq_table)
    target_dist_table = 0.5 * (minsep_table + dmax_table)

    initial_under = _under_count(cn_array)
    if initial_under == 0:
        return 0

    accepted = 0
    rejected = 0
    for it in range(max_iter):
        # Identify under-coordinated atoms.
        under_mask = (cn_array[:n_placed] < target_cn_arr[placed_type_idx[:n_placed]])
        under_idx = np.where(under_mask)[0]
        if len(under_idx) == 0:
            break

        # Pick one to move; sample partner from remaining under-coord atoms.
        idx = int(rng.choice(under_idx))
        partners = under_idx[under_idx != idx]
        if len(partners) > 0:
            partner = int(rng.choice(partners))
            t_target = float(target_dist_table[
                placed_type_idx[idx], placed_type_idx[partner]
            ])
            direction = rng.standard_normal(3)
            direction /= np.linalg.norm(direction)
            new_pos = positions[partner] + direction * t_target
        else:
            # No other under-coord atom; perturb existing position.
            new_pos = positions[idx] + rng.standard_normal(3) * 0.5

        # Wrap into cell.
        new_pos = new_pos - L * np.floor(new_pos / L) if pbc else new_pos

        # Minsep check against all other placed atoms.
        others_idx = np.arange(n_placed)
        others_idx = others_idx[others_idx != idx]
        d_vec = new_pos - positions[others_idx]
        if pbc:
            d_vec -= L * np.round(d_vec / L)
        d_sq = np.sum(d_vec * d_vec, axis=1)
        min_sq = minsep_sq_table[placed_type_idx[idx], placed_type_idx[others_idx]]
        if np.any(d_sq < min_sq):
            rejected += 1
            continue

        # Compute proposed neighbour list (bonded set) for the new position.
        dm_sq = dmax_sq_table[placed_type_idx[idx], placed_type_idx[others_idx]]
        new_bonded = (dm_sq > 0) & (d_sq <= dm_sq)
        new_neighbour_count = int(np.sum(new_bonded))

        # Old bonded set (so we know whose CN to decrement after move).
        old_pos = positions[idx].copy()
        d_vec_old = old_pos - positions[others_idx]
        if pbc:
            d_vec_old -= L * np.round(d_vec_old / L)
        d_sq_old = np.sum(d_vec_old * d_vec_old, axis=1)
        old_bonded = (dm_sq > 0) & (d_sq_old <= dm_sq)

        # Tentative CN array after move.
        cn_trial = cn_array.copy()
        cn_trial[others_idx[old_bonded]] -= 1
        cn_trial[others_idx[new_bonded]] += 1
        cn_trial[idx] = new_neighbour_count

        # Reject if any neighbour is now over-coordinated beyond tolerance.
        target_neighbours = target_cn_arr[placed_type_idx[others_idx[new_bonded]]]
        new_neighbour_cn = cn_trial[others_idx[new_bonded]]
        if np.any(new_neighbour_cn > target_neighbours + cn_tolerance):
            rejected += 1
            continue
        if cn_trial[idx] > target_cn_arr[placed_type_idx[idx]] + cn_tolerance:
            rejected += 1
            continue

        # Accept iff total under-coord count strictly drops.
        new_under = _under_count(cn_trial)
        cur_under = _under_count(cn_array)
        if new_under < cur_under:
            positions[idx] = new_pos
            cn_array[:] = cn_trial
            accepted += 1
        else:
            rejected += 1

    final_under = _under_count(cn_array)
    logger.info(
        "  Repair pass: %d/%d proposals accepted; under-coord %d -> %d",
        accepted, accepted + rejected, initial_under, final_under,
    )
    return final_under


# ==============================================================================
# Structure generation
# ==============================================================================

[docs] def generate_random( composition: dict[str, int], cell_length_ang: float | None = None, target_density: float | None = None, density_scale: float = 1.0, minsep: dict[str, float] | None = None, minsep_scale: float = 0.85, seed: int | None = None, max_attempts_per_atom: int = 700000, pbc: bool = True, target_cn: dict[str, int] | None = None, dmax: dict[str, float] | None = None, cn_tolerance: int | None = None, dmax_factor: float = 1.5, repair_iters: int = 0, ) -> Atoms: """ Generate a single random structure. Parameters ---------- composition : dict Atom counts per element, e.g. {"In": 32, "O": 48} for 80-atom In2O3. The CLI also accepts formula format (``In2O3*16``) which is converted to this dict form automatically. cell_length_ang : float, optional Cubic cell edge length. If None, estimated from target_density. target_density : float, optional Target density in g/cm3 for cell size estimation. If supplied, ``density_scale`` is ignored. density_scale : float, default 1.0 Multiplier applied to the *auto-estimated* density (sphere-packing or elemental-mixing path) before cell sizing. Useful for tight-network compositions (covalent semiconductors, metallic glasses) where the sphere-packing model underestimates the equilibrium amorphous density by 15-25%; setting ``density_scale=1.2`` boosts the auto density 20%. Has no effect when ``target_density`` is supplied explicitly. minsep : dict, optional Minimum pair separations, e.g. {"In-In": 2.8, "In-O": 1.9}. If None, auto-generated from Shannon ionic / metallic / covalent radii with bonding-type-aware scale factors. minsep_scale : float Fallback scale factor for default minsep (default 0.85). seed : int, optional Random seed for reproducibility. max_attempts_per_atom : int Max placement attempts per atom before raising an error. pbc : bool Periodic boundary conditions. target_cn : dict, optional Target coordination numbers, e.g. {"Si": 4, "O": 2}. Enables coordination-aware placement (atoms biased toward existing under-coordinated sites within the bonding shell). Also uses CN-specific Shannon radii for tighter minsep. dmax : dict, optional Maximum bond distances (defines "bonded"), e.g. {"Si-O": 2.0}. Auto-generated from minsep * 1.5 if not provided. cn_tolerance : int Over-coordination tolerance for coordination-aware placement. Default 0 (strict: reject if any neighbour is at target CN). Set to 1 to allow temporary +1 over-coordination for tighter CN matching. dmax_factor : float Multiplier for auto dmax: dmax = minsep * dmax_factor (default 1.5). Controls the width of the bonding shell. Ignored if dmax is provided explicitly. repair_iters : int, default 0 **Experimental.** If > 0, run a post-placement repair loop that relocates under-coordinated atoms within the bonding shell of other under-coordinated atoms, accepting moves that reduce the total under-coord count. Useful for tetrahedral covalent networks (a-Si, a-Ge) where greedy placement leaves many atoms below target CN. Default 0 (off). See :func:`_repair_undercoordination` for caveats. Returns ------- ase.Atoms """ rng = np.random.default_rng(seed) symbols = [] for species, count in composition.items(): symbols.extend([species] * count) n_atoms = len(symbols) rng.shuffle(symbols) # Auto-detect target CN and tolerance if not provided # Empty dict {} means user explicitly disabled coordination-aware mode (--no-sc) if target_cn is None: target_cn, auto_tol = _auto_target_cn(composition) if cn_tolerance is None: cn_tolerance = auto_tol elif target_cn == {}: target_cn = None # disable coordination-aware placement # If cn_tolerance still None, default 0 if cn_tolerance is None: cn_tolerance = 0 if cell_length_ang is None: cell_length_ang = _estimate_cell_length( composition, target_density, density_scale=density_scale, ) L = cell_length_ang if minsep is None: minsep = _default_minsep(symbols, scale=minsep_scale, target_cn=target_cn) # Coordination-aware mode: auto-generate dmax from minsep if not provided use_sc = target_cn is not None if use_sc and dmax is None: dmax = _auto_dmax(minsep, target_cn, factor=dmax_factor) logger.info("Auto-generated dmax from minsep * 1.5: %s", dmax) # CN tracking array (only used in coordination-aware mode) cn_array = np.zeros(n_atoms, dtype=int) if use_sc else None positions = np.empty((n_atoms, 3)) placed_symbols = [] n_placed = 0 # Precompute lookup tables for O(1) access (indexed by element type) unique_syms = sorted(set(symbols)) _sym_to_idx = {s: i for i, s in enumerate(unique_syms)} _n_types = len(unique_syms) # Minsep tables _minsep_sq_table = np.zeros((_n_types, _n_types)) for s1 in unique_syms: for s2 in unique_syms: d = _get_minsep(s1, s2, minsep) _minsep_sq_table[_sym_to_idx[s1], _sym_to_idx[s2]] = d * d # Dmax table (for coordination-aware mode) _dmax_table = np.zeros((_n_types, _n_types)) _dmax_sq_table = np.zeros((_n_types, _n_types)) if use_sc and dmax: for s1 in unique_syms: for s2 in unique_syms: d = _get_dmax(s1, s2, dmax) _dmax_table[_sym_to_idx[s1], _sym_to_idx[s2]] = d _dmax_sq_table[_sym_to_idx[s1], _sym_to_idx[s2]] = d * d # Target CN + tolerance table (for coordination-aware mode) _target_cn_arr = np.full(_n_types, 999, dtype=int) if use_sc and target_cn: for s, cn_val in target_cn.items(): if s in _sym_to_idx: _target_cn_arr[_sym_to_idx[s]] = cn_val # Track integer type index for each placed atom placed_type_idx = np.empty(n_atoms, dtype=int) import time as _time _t_start = _time.time() _last_progress = _t_start for i, sym in enumerate(symbols): sym_idx = _sym_to_idx[sym] if n_placed > 0: min_dists_sq = _minsep_sq_table[sym_idx, placed_type_idx[:n_placed]] placed = False # -- Coordination-aware path -- # Find seeds that need more bonds (vectorized). if use_sc and n_placed > 0: types_placed = placed_type_idx[:n_placed] dm_to_new = _dmax_table[types_placed, sym_idx] tgt_placed = _target_cn_arr[types_placed] cn_placed = cn_array[:n_placed] # Bondable: has dmax > 0, target > 0, cn < target + tolerance bondable_mask = ( (dm_to_new > 0) & (tgt_placed < 999) & (cn_placed < tgt_placed + cn_tolerance) ) bondable_idx = np.where(bondable_mask)[0] if len(bondable_idx) > 0: # Sort by deficit (most under-coordinated first) deficits = tgt_placed[bondable_idx] - cn_placed[bondable_idx] order = np.argsort(-deficits) bondable_idx = bondable_idx[order] attempts_per_seed = max( 1000, max_attempts_per_atom // len(bondable_idx) ) # Precompute over-coordination check arrays for this atom type dm_check = _dmax_sq_table[types_placed, sym_idx] tgt_check = _target_cn_arr[types_placed] for seed_idx in bondable_idx: seed_pos = positions[seed_idx] ms_sq = _minsep_sq_table[ placed_type_idx[seed_idx], sym_idx] ms = ms_sq ** 0.5 dm = dm_to_new[seed_idx] for attempt in range(attempts_per_seed): direction = rng.standard_normal(3) direction /= np.linalg.norm(direction) distance = rng.uniform(ms, dm) pos = seed_pos + direction * distance if pbc: pos = pos % L d_vec = pos - positions[:n_placed] if pbc: d_vec -= L * np.round(d_vec / L) d_sq = np.sum(d_vec * d_vec, axis=1) if not np.all(d_sq >= min_dists_sq): continue # Vectorized over-coordination check: # Find neighbours within dmax that are already # at or above target + tolerance (excluding seed) within_dmax = (dm_check > 0) & (d_sq <= dm_check) within_dmax[seed_idx] = False if np.any(within_dmax): over = cn_placed[within_dmax] >= ( tgt_check[within_dmax] + cn_tolerance) if np.any(over): continue # Accept positions[n_placed] = pos placed_symbols.append(sym) placed_type_idx[n_placed] = sym_idx n_placed += 1 _update_cn_array(cn_array, n_placed - 1, positions, n_placed, placed_type_idx, sym_idx, _dmax_sq_table, L, pbc) placed = True break if placed: break # -- Standard path: pure rejection sampling -- if not placed: for attempt in range(max_attempts_per_atom): pos = rng.random(3) * L if n_placed == 0: positions[0] = pos placed_symbols.append(sym) placed_type_idx[0] = sym_idx n_placed = 1 if use_sc: cn_array[0] = 0 placed = True break d_vec = pos - positions[:n_placed] if pbc: d_vec -= L * np.round(d_vec / L) d_sq = np.sum(d_vec * d_vec, axis=1) if not np.all(d_sq >= min_dists_sq): continue # Vectorized over-coordination check if use_sc: types_p = placed_type_idx[:n_placed] dm_sq_check = _dmax_sq_table[types_p, sym_idx] within = (dm_sq_check > 0) & (d_sq <= dm_sq_check) if np.any(within): tgt_w = _target_cn_arr[types_p[within]] cn_w = cn_array[:n_placed][within] if np.any(cn_w >= tgt_w + cn_tolerance): continue positions[n_placed] = pos placed_symbols.append(sym) placed_type_idx[n_placed] = sym_idx n_placed += 1 if use_sc: _update_cn_array(cn_array, n_placed - 1, positions, n_placed, placed_type_idx, sym_idx, _dmax_sq_table, L, pbc) placed = True break if not placed: raise RuntimeError( f"Could not place atom {i+1}/{n_atoms} ({sym}) after " f"{max_attempts_per_atom} attempts.\n" f" Cell: {L:.2f} A, placed {n_placed}/{n_atoms} atoms so far.\n" f" Suggestions:\n" f" 1. Use --target-density with a lower value\n" f" 2. Reduce minsep via --minsep flag\n" f" 3. Try --no-sc to disable coordination-aware placement\n" f" 4. Use batch_random() which auto-retries with reduced M-M minsep" ) # Progress reporting (every 5 seconds for large systems) _now = _time.time() if _now - _last_progress > 5.0: _last_progress = _now elapsed = _now - _t_start logger.info(" Placed %d/%d atoms (%.1f s)", n_placed, n_atoms, elapsed) # Optional repair pass for under-coordinated atoms (experimental). if use_sc and repair_iters > 0: _repair_undercoordination( positions=positions, n_placed=n_placed, placed_type_idx=placed_type_idx, cn_array=cn_array, target_cn_arr=_target_cn_arr, cn_tolerance=cn_tolerance, minsep_sq_table=_minsep_sq_table, dmax_sq_table=_dmax_sq_table, L=L, pbc=pbc, max_iter=repair_iters, rng=rng, ) atoms = Atoms( symbols=placed_symbols, positions=positions[:n_placed], cell=[L, L, L], pbc=pbc, ) atoms.wrap() # Post-placement CN report (coordination-aware mode) sc_report = {} if use_sc: for elem, tgt in target_cn.items(): indices = [k for k, s in enumerate(placed_symbols) if s == elem] cns = [int(cn_array[k]) for k in indices] if cns: sc_report[elem] = { "target": tgt, "mean": float(np.mean(cns)), "min": min(cns), "max": max(cns), } for elem in sorted(set(placed_symbols)): if elem not in target_cn: indices = [k for k, s in enumerate(placed_symbols) if s == elem] cns = [int(cn_array[k]) for k in indices] if cns: sc_report[elem] = { "target": "auto", "mean": float(np.mean(cns)), "min": min(cns), "max": max(cns), } atoms.info["sc_report"] = sc_report return atoms
# ============================================================================== # Batch generation # ============================================================================== # Map output format names to ASE format strings and file extensions _FORMAT_MAP = { "xyz": ("extxyz", ".xyz"), # default: extxyz format, .xyz extension "extxyz": ("extxyz", ".xyz"), # alias "vasp": ("vasp", ".vasp"), "cif": ("cif", ".cif"), }
[docs] def batch_random( composition: dict[str, int], n_structures: int = 1, output_dir: str = "random_structures", output_format: str = "xyz", relax: bool = False, calc=None, fmax: float = 0.05, max_relax_steps: int = 200, optimizer: str = "FIRE", cell_filter: str = "FrechetCellFilter", max_retries: int = 10, resume: bool = False, **kwargs, ) -> list[str]: """ Generate multiple random structures, optionally relaxing each. Parameters ---------- composition : dict n_structures : int output_dir : str output_format : str Output file format: "xyz" (default, extxyz format), "vasp", "cif". relax : bool If True and calc is provided, optimise each structure. calc : ASE calculator, optional fmax : float max_relax_steps : int optimizer : str cell_filter : str max_retries : int **kwargs Forwarded to generate_random(). Returns ------- list of str Paths to generated structure files. Returns file paths (not Atoms objects) because batch generation writes each structure to disk as it is created, enabling recovery from interruptions and keeping memory usage constant regardless of batch size. Notes ----- For a single structure as an Atoms object, use ``generate_random()`` directly. Keyword arguments (``target_cn``, ``minsep``, ``dmax``, ``minsep_scale``, ``cn_tolerance``, ``target_density``, ``density_scale``, ``cell_length_ang``, ``max_attempts_per_atom``) are forwarded to ``generate_random()``. """ if output_format not in _FORMAT_MAP: raise ValueError( f"Unknown output format '{output_format}'. " f"Choose from: {', '.join(_FORMAT_MAP)}" ) ase_format, ext = _FORMAT_MAP[output_format] os.makedirs(output_dir, exist_ok=True) paths = [] # ── Resume support: scan for existing completed structures ── existing_indices = set() if resume: import json as _json # Write or check run metadata for consistency meta_path = os.path.join(output_dir, "run_metadata.json") current_meta = { "composition": composition, "output_format": output_format, "relax": relax, } if os.path.isfile(meta_path): try: with open(meta_path) as mf: prev_meta = _json.load(mf) if prev_meta.get("composition") != composition: import warnings warnings.warn( f"Resume: composition changed from " f"{prev_meta['composition']} to {composition}. " f"Structures may be inconsistent." ) except Exception: pass # corrupted metadata, proceed anyway # Scan for completed files for idx in range(n_structures): if relax: check_file = os.path.join( output_dir, f"random_{idx:04d}_opt{ext}") else: check_file = os.path.join( output_dir, f"random_{idx:04d}{ext}") if os.path.isfile(check_file) and os.path.getsize(check_file) > 0: # Validate ASE can read it try: from ase.io import read as _read _read(check_file) existing_indices.add(idx) paths.append(check_file) except Exception: pass # corrupted file, will regenerate if existing_indices: missing = [i for i in range(n_structures) if i not in existing_indices] print(f" Resume: found {len(existing_indices)} existing " f"structures, generating {len(missing)} more " f"(indices {missing[0]}-{missing[-1]})" if missing else f" Resume: all {n_structures} structures already exist.") else: print(f" Resume: no existing structures found, " f"starting fresh.") base_seed = kwargs.pop("seed", None) target_cn = kwargs.get("target_cn") # If target_cn not specified, auto-detect for logging if target_cn is None: target_cn, _ = _auto_target_cn(composition) if target_cn is not None: kwargs["target_cn"] = target_cn dmax_user = kwargs.get("dmax") # Pre-compute minsep and cell for logging symbols_all = [] for species, count in composition.items(): symbols_all.extend([species] * count) n_atoms = len(symbols_all) formula = Atoms(symbols_all).get_chemical_formula(mode="hill") minsep_log = kwargs.get("minsep") if minsep_log is None: minsep_log = _default_minsep(symbols_all, scale=kwargs.get("minsep_scale", 0.85)) td = kwargs.get("target_density") ds = kwargs.get("density_scale", 1.0) cell_L = kwargs.get("cell_length_ang") if cell_L is None: cell_L = _estimate_cell_length(composition, td, density_scale=ds) vol = cell_L ** 3 total_mass = sum(atomic_masses[atomic_numbers[s]] * n for s, n in composition.items()) est_density = (total_mass / 6.022e23) / (vol * 1e-24) use_sc = target_cn is not None and target_cn != {} dmax_fac = kwargs.get("dmax_factor", 1.5) if use_sc and dmax_user is None: dmax_log = _auto_dmax(minsep_log, target_cn, factor=dmax_fac) else: dmax_log = dmax_user # -- Write log file -- logfile = os.path.join(output_dir, "random_gen.log") bar = "=" * 60 def _log(msg, lf): print(msg) lf.write(msg + "\n") lf.flush() lf = open(logfile, "a" if resume else "w") try: _log(f"\n{bar}", lf) _log(f" AmorphGen - Random Structure Generation", lf) _log(f" Composition: {formula} ({n_atoms} atoms)", lf) _log(f" Cell: {cell_L:.2f} A (cubic), Volume: {vol:.1f} A3", lf) _log(f" Estimated density: {est_density:.2f} g/cm3", lf) if td is not None: _log(f" Target density: {td:.2f} g/cm3", lf) else: has_gas_element = any( ELEMENTAL_DENSITIES.get(s) is None for s in composition ) if has_gas_element: _log(f" NOTE: Auto density is approximate for this composition.", lf) _log(f" Use --target-density for more accurate results.", lf) _log(f" Mode: {'Coordination-aware' if use_sc else 'Standard rejection'}", lf) _log(f" N structures: {n_structures}", lf) _log(f" Output format: {output_format}", lf) _log(f" Output dir: {output_dir}/", lf) if use_sc: _log(f" Target CN: {target_cn}", lf) _log(f"{bar}", lf) # One-line auto-derivation summary — captures the chemistry-informed # decisions (class, OS, CN, bond types, Pauling Δχ, minsep, density) # in a single grep-friendly log line. _log(_format_auto_derive_summary( composition=composition, target_cn=target_cn if use_sc else None, minsep=minsep_log, est_density=est_density, cell_length=cell_L, ), lf) _log(f"\n Minsep values:", lf) for pair in sorted(minsep_log): _log(f" {pair}: {minsep_log[pair]:.3f} A", lf) if use_sc and dmax_log: _log(f"\n Dmax values{' (auto)' if dmax_user is None else ''}:", lf) for pair in sorted(dmax_log): _log(f" {pair}: {dmax_log[pair]:.3f} A", lf) _log("", lf) # -- Generate structures -- # Auto-retry: reduce M-M minsep by 5/10/15/20% generated = 0 seed_offset = 0 failures = 0 retry_level = 0 current_minsep = dict(kwargs.get("minsep") or minsep_log) def _reduce_mm_minsep(ms, reduction): """Reduce same-element minsep values by a factor.""" reduced = dict(ms) for pair in reduced: s1, s2 = pair.split("-") if s1 == s2 and s1 not in NONMETALS: reduced[pair] = reduced[pair] * (1.0 - reduction) return reduced # Save run metadata for resume consistency checks if resume or True: # always write metadata import json as _json meta_path = os.path.join(output_dir, "run_metadata.json") meta = { "composition": composition, "output_format": output_format, "relax": relax, "n_structures": n_structures, } with open(meta_path, "w") as mf: _json.dump(meta, mf, indent=2) # Pre-warm calculator: model load + MPS graph compile happen once, # outside the per-structure timing. Otherwise the first structure # (or first structure after --resume) absorbs ~minutes of init cost. if relax and calc is not None: try: syms = list(composition.keys()) positions = [[2.5 * i, 0.0, 0.0] for i in range(len(syms))] warmup_atoms = Atoms( symbols=syms, positions=positions, cell=[10.0, 10.0, 10.0], pbc=True, ) warmup_atoms.calc = calc t_warm = time.perf_counter() warmup_atoms.get_potential_energy() t_warm = time.perf_counter() - t_warm _log(f" Calculator warmup: {t_warm:.2f} s " f"(model load + first inference)\n", lf) except Exception as e: _log(f" Calculator warmup skipped ({type(e).__name__}: {e})\n", lf) while generated < n_structures: # Skip if resume and this index already completed if generated in existing_indices: _log(f" [{generated+1}/{n_structures}] Already exists " f"-- skipping.", lf) generated += 1 continue seed_i = base_seed + seed_offset if base_seed is not None else None seed_offset += 1 try: atoms = generate_random(composition, seed=seed_i, **kwargs) except RuntimeError: failures += 1 if failures >= max_retries: if retry_level < 3: retry_level += 1 reduction = 0.05 * retry_level new_minsep = _reduce_mm_minsep(current_minsep, reduction) kwargs["minsep"] = new_minsep changed = [] for pair in sorted(new_minsep): s1, s2 = pair.split("-") if s1 == s2 and s1 not in NONMETALS: changed.append(f"{pair}={new_minsep[pair]:.2f}") _log(f" [Auto-retry] Reducing M-M minsep by " f"{reduction*100:.0f}%: {', '.join(changed)}", lf) failures = 0 continue elif retry_level < 4: retry_level += 1 reduction = 0.05 * retry_level new_minsep = _reduce_mm_minsep(current_minsep, reduction) kwargs["minsep"] = new_minsep changed = [] for pair in sorted(new_minsep): s1, s2 = pair.split("-") if s1 == s2 and s1 not in NONMETALS: changed.append(f"{pair}={new_minsep[pair]:.2f}") _log(f" [Auto-retry] Reducing M-M minsep by " f"{reduction*100:.0f}%: {', '.join(changed)}", lf) failures = 0 continue else: _log(f" [Warning] Skipping structure {generated + 1} " f"after {max_retries} attempts x {retry_level+1} " f"retries. Consider using --target-density.", lf) failures = 0 retry_level = 0 generated += 1 continue failures = 0 # Save unrelaxed structure fname = os.path.join(output_dir, f"random_{generated:04d}{ext}") if ase_format == "vasp": sorted_atoms_ur = atoms[atoms.numbers.argsort()] write(fname, sorted_atoms_ur, format=ase_format, sort=True) else: write(fname, atoms, format=ase_format) if relax and calc is not None: from ..utils.common import compute_density_gcm3 from ase.geometry import cell_to_cellpar atoms.calc = calc OptimizerClass = _get_optimizer_class(optimizer) target = _build_cell_filter(atoms, cell_filter) opt_logfile = os.path.join(output_dir, f"random_{generated:04d}_opt.log") # Detailed logging (consistent with batch-opt) formula = atoms.get_chemical_formula(mode="hill") cp = cell_to_cellpar(atoms.cell) vol = atoms.get_volume() density = compute_density_gcm3(atoms) _log(f"\n Composition: {formula} ({len(atoms)} atoms)", lf) _log(f" Initial cell: a={cp[0]:.4f} b={cp[1]:.4f} " f"c={cp[2]:.4f}", lf) _log(f" Volume: {vol:.2f} A^3 Density: {density:.2f} g/cm3", lf) _log(f" Optimizer: {optimizer} fmax={fmax} " f"max_steps={max_relax_steps}", lf) _log(f" Cell filter: {cell_filter}", lf) header = (f"\n {'Step':>5} {'Energy(eV)':>14} " f"{'Fmax(eV/A)':>11} {'a(A)':>10} " f"{'b(A)':>10} {'c(A)':>10} {'Vol(A3)':>10}") sep = " " + "-" * 85 _log(header, lf) _log(sep, lf) opt = OptimizerClass(target, logfile=None) t_relax_start = time.perf_counter() steps_done = 0 for step in range(max_relax_steps): opt.step() energy = atoms.get_potential_energy() forces = target.get_forces() max_f = float((forces ** 2).sum(axis=1).max() ** 0.5) cp = cell_to_cellpar(atoms.cell) vol = atoms.get_volume() line = (f" {step+1:5d} {energy:14.6f} {max_f:11.6f} " f"{cp[0]:10.4f} {cp[1]:10.4f} {cp[2]:10.4f} " f"{vol:10.1f}") _log(line, lf) steps_done = step + 1 if max_f < fmax: _log(sep, lf) _log(f"\n Converged after {step+1} steps! " f"Fmax = {max_f:.6f} eV/A", lf) break else: _log(sep, lf) _log(f"\n WARNING: did not converge in " f"{max_relax_steps} steps.", lf) t_relax = time.perf_counter() - t_relax_start per_step = t_relax / steps_done if steps_done else float("nan") _log(f" Wall time: {t_relax:.2f} s " f"({steps_done} steps, {per_step:.3f} s/step)", lf) density_f = compute_density_gcm3(atoms) _log(f" Final density: {density_f:.2f} g/cm3", lf) # Save relaxed structure as _opt fname_opt = os.path.join( output_dir, f"random_{generated:04d}_opt{ext}") if ase_format == "vasp": sorted_atoms_r = atoms[atoms.numbers.argsort()] write(fname_opt, sorted_atoms_r, format=ase_format, sort=True) else: write(fname_opt, atoms, format=ase_format) fname = fname_opt # return path to relaxed version paths.append(fname) out_formula = atoms.get_chemical_formula(mode="hill") seed_str = f" (seed={seed_i})" if seed_i is not None else "" _log(f" [{generated+1}/{n_structures}] {out_formula} -> " f"{fname}{seed_str}", lf) sc_report = atoms.info.get("sc_report") if sc_report: for elem, data in sorted(sc_report.items()): _log(f" CN: {elem} target={data['target']}, " f"mean={data['mean']:.1f}, " f"range=[{data['min']},{data['max']}]", lf) generated += 1 _log(f"\n Generated {len(paths)} structures in {output_dir}/", lf) if len(paths) < n_structures: _log(f" Warning: only {len(paths)}/{n_structures} structures " f"were successfully generated.", lf) _log(f" Log saved: {logfile}", lf) finally: lf.close() return paths