Source code for csd_operations

"""
Module: csd_operations.py 

High-level interface for interacting with the Cambridge Structural Database (CSD).

This module provides functionality to:
- Extract and filter refcode families
- Cluster structures by packing similarity
- Select representative structures using the vdWFV metric
- Save intermediate results to CSV

Dependencies
------------
pandas
networkx
ccdc
csd_structure_validator
"""

from pathlib import Path
from typing import Dict, Union, List, Tuple, Optional
import logging
import pandas as pd
import networkx as nx
import os
from itertools import combinations
from concurrent.futures import ProcessPoolExecutor
from dataclasses import dataclass
from ccdc import io
from ccdc.crystal import PackingSimilarity
from csd_structure_validator import StructureValidator

logger = logging.getLogger(__name__)

[docs] @dataclass class SimilaritySettings: """ Configuration settings for packing similarity comparisons of crystal structures. Parameters ---------- distance_tolerance : float, default=0.2 Maximum allowed deviation in atomic distances (Å) when comparing packings. angle_tolerance : float, default=20.0 Maximum allowed angular deviation (degrees) between molecular orientations. ignore_bond_types : bool, default=True If True, matching bond orders are not required for similarity. ignore_hydrogen_counts : bool, default=True If True, differences in hydrogen counts are ignored. ignore_hydrogen_positions : bool, default=True If True, explicit hydrogen coordinate differences are ignored. packing_shell_size : int, default=15 Number of molecules considered in each packing-shell comparison. ignore_spacegroup : bool, default=True If True, space-group designations are not required to match. normalise_unit_cell : bool, default=True If True, unit cell parameters are normalized before comparison. """ distance_tolerance: float = 0.2 angle_tolerance: float = 20.0 ignore_bond_types: bool = True ignore_hydrogen_counts: bool = True ignore_hydrogen_positions: bool = True packing_shell_size: int = 15 ignore_spacegroup: bool = True normalise_unit_cell: bool = True
[docs] class CSDOperations: """ High-level interface for querying, validating, clustering, and selecting crystal structures from the Cambridge Structural Database (CSD). Attributes ---------- data_directory : Path Base directory for reading and writing CSD-related files. data_prefix : str Prefix used when naming output files. reader : io.EntryReader CCDC EntryReader instance connected to the "CSD" database. similarity_engine : PackingSimilarity Engine for computing pairwise packing similarity. """
[docs] def __init__(self, data_directory: Union[str, Path], data_prefix: str): """ Initialize CSDOperations with target directory and filename prefix. Parameters ---------- data_directory : Union[str, Path] Directory under which all CSV outputs will be saved. data_prefix : str Prefix for generated CSV filenames (e.g., "<prefix>_refcode_families.csv"). """ self.data_directory = Path(data_directory) self.data_prefix = data_prefix self.reader = io.EntryReader("CSD") self._setup_similarity_engine()
def _setup_similarity_engine(self, settings: Optional[SimilaritySettings] = None): """ Instantiate and configure the packing similarity engine. Parameters ---------- settings : SimilaritySettings, optional Custom threshold settings. If None, default settings are used. """ settings = settings or SimilaritySettings() self.similarity_engine = PackingSimilarity() self.similarity_engine.settings.distance_tolerance = settings.distance_tolerance self.similarity_engine.settings.angle_tolerance = settings.angle_tolerance self.similarity_engine.settings.ignore_bond_types = settings.ignore_bond_types self.similarity_engine.settings.ignore_hydrogen_counts = settings.ignore_hydrogen_counts self.similarity_engine.settings.ignore_hydrogen_positions = settings.ignore_hydrogen_positions self.similarity_engine.settings.packing_shell_size = settings.packing_shell_size
[docs] def get_refcode_families_df(self) -> pd.DataFrame: """ Query the CSD and group entries by base refcode. Returns ------- pd.DataFrame DataFrame with columns: - family_id : str, first six characters of the refcode - refcode : str, full CSD refcode """ records = [] for entry in self.reader: identifier = entry.identifier family_id = identifier[:6] records.append({"family_id": family_id, "refcode": identifier}) return pd.DataFrame(records)
[docs] def save_refcode_families_csv(self, df: Optional[pd.DataFrame] = None, filename: Optional[Union[str, Path]] = None) -> None: """ Write the refcode-families DataFrame to a CSV file. Parameters ---------- df : pd.DataFrame, optional DataFrame to save. If None, uses get_refcode_families_df(). filename : Union[str, Path], optional Full file path for output. If None, defaults to data_directory / f"{data_prefix}_refcode_families.csv". Raises ------ OSError If writing to disk fails. """ if df is None: df = self.get_refcode_families_df() output_path = Path(filename) if filename else self.data_directory / f"{self.data_prefix}_refcode_families.csv" self.data_directory.mkdir(parents=True, exist_ok=True) df.to_csv(output_path, index=False)
[docs] def filter_families_by_size(self, df: pd.DataFrame, min_size: int = 2) -> pd.DataFrame: """ Exclude families with fewer than a specified number of members. Parameters ---------- df : pd.DataFrame DataFrame with columns ['family_id', 'refcode']. min_size : int, default=2 Minimum number of members for a family to be retained. Returns ------- pd.DataFrame Filtered DataFrame. Raises ------ KeyError If 'family_id' column is missing. """ counts = df['family_id'].value_counts() valid = counts[counts >= min_size].index return df[df['family_id'].isin(valid)]
[docs] def cluster_families(self, filters: Dict) -> Dict[str, List[List[str]]]: """ Perform packing similarity clustering on each refcode family. Workflow -------- 1. Load initial refcode families CSV. 2. Group refcodes by 'family_id'. 3. For each group, validate entries and build a similarity graph. 4. Identify connected components as clusters. 5. Save clustered results to CSV. Parameters ---------- filters : Dict[str, Any] Criteria for structure validation. Returns ------- pd.DataFrame DataFrame with columns ['family_id', 'refcode', 'cluster_id']. Raises ------ FileNotFoundError If the initial CSV is missing. RuntimeError If clustering fails for any family. """ df_path = self.data_directory / f"{self.data_prefix}_refcode_families.csv" if not df_path.exists(): raise FileNotFoundError(f"Refcode families CSV not found: {df_path}") df = pd.read_csv(df_path) families = df.groupby("family_id")["refcode"].apply(list).to_dict() args = [(fam_id, refcodes, filters) for fam_id, refcodes in families.items()] max_workers = os.cpu_count() - 4 or 4 records = [] # ← NEW with ProcessPoolExecutor(max_workers=max_workers) as executor: for fam_id, clusters in executor.map(_process_single_family, args): for cluster_idx, cluster in enumerate(clusters, start=1): for refcode in cluster: records.append( {"family_id": fam_id, "refcode": refcode, "cluster_id": cluster_idx} ) df_clusters = pd.DataFrame(records) self._save_clustered_families(df_clusters) return df_clusters
[docs] def _check_structure(self, identifier: str, filters: Dict, entry: Optional[io.Entry] = None) -> bool: """ Validate a CSD entry against filter criteria. Parameters ---------- identifier : str CSD refcode. filters : Dict[str, Any] Validation criteria. entry : io.Entry, optional Preloaded CSD entry. If None, loaded internally. Returns ------- bool True if the structure is valid, False otherwise. Raises ------ Exception If validation fails unexpectedly. """ if entry is None: entry = self.reader.entry(identifier) validator = StructureValidator(filters) result = validator.validate(entry.crystal, entry.molecule) return result.is_valid
def _save_clustered_families(self, df: pd.DataFrame) -> None: """ Save clustered families DataFrame to CSV. Parameters ---------- df : pd.DataFrame Must include ['family_id', 'refcode', 'cluster_id']. Raises ------ OSError If file writing fails. """ output_file = ( self.data_directory / f"{self.data_prefix}_refcode_families_clustered.csv" ) df.to_csv(output_file, index=False) logger.info(f"Saved clustered families to {output_file}")
[docs] def get_unique_structures(self, filters: Dict, method: str = "vdWFV") -> pd.DataFrame: # noqa: D401 """ Select one representative per cluster using the vdWFV metric. Workflow -------- 1. Load clustered families CSV. 2. Group by ['family_id', 'cluster_id']. 3. Compute vdWFV for each refcode; select the minimum. 4. Save unique representatives to CSV. Parameters ---------- filters : Dict[str, Any] Placeholder for revalidation filters. method : str, default="vdWFV" Only 'vdWFV' is supported. Returns ------- pd.DataFrame DataFrame with columns ['family_id', 'refcode']. Raises ------ FileNotFoundError If clustered CSV is missing. NotImplementedError If method is not 'vdWFV'. """ if method != "vdWFV": raise NotImplementedError("Only vdWFV method is supported.") csv_clusters = self.data_directory / f"{self.data_prefix}_refcode_families_clustered.csv" if not csv_clusters.exists(): raise FileNotFoundError(csv_clusters) df_clusters = pd.read_csv(csv_clusters) if "cluster_id" in df_clusters.columns: group_cols = ["family_id", "cluster_id"] else: # fall‑back – treat each family as single cluster group_cols = ["family_id"] df_clusters["cluster_id"] = 1 # dummy column for grouping # build argument list for executor args: List[Tuple[str, List[str]]] = [ (fam_id, grp["refcode"].tolist()) for (fam_id, _), grp in df_clusters.groupby(group_cols) ] reps: List[Tuple[str, str]] = [] max_workers = os.cpu_count() - 4 or 4 with ProcessPoolExecutor(max_workers=max_workers) as exe: for fam_id, rep_rc in exe.map(_representative_for_cluster, args): reps.append((fam_id, rep_rc)) df_unique = pd.DataFrame(reps, columns=["family_id", "refcode"]).drop_duplicates() self._save_unique_structures(df_unique) return df_unique
[docs] def _save_unique_structures(self, df: pd.DataFrame) -> None: # unchanged except docstring tweak """ Save unique structure representatives to CSV. Parameters ---------- df : pd.DataFrame DataFrame with columns ['family_id', 'refcode']. Raises ------ OSError If file writing fails. """ out = self.data_directory / f"{self.data_prefix}_refcode_families_unique.csv" df.to_csv(out, index=False) logger.info("Saved unique structures to %s", out)
[docs] def _process_single_family(args: Tuple[str, List[str], Dict]) -> Tuple[str, List[List[str]]]: """ Validate and cluster a single refcode family by packing similarity. Parameters ---------- args : Tuple[str, List[str], Dict[str, Any]] - family_id : str - structures : List[str] - filters : Dict of validation criteria Returns ------- Tuple[str, List[List[str]]] family_id and list of clusters (each a list of refcodes). Raises ------ Exception If any error occurs during processing. """ family_id, structures, filters = args reader = io.EntryReader("CSD") validator = StructureValidator(filters) valid = [] for identifier in structures: try: entry = reader.entry(identifier) if validator.validate(entry.crystal, entry.molecule).is_valid: valid.append((identifier, entry.crystal)) except Exception: continue sim = PackingSimilarity() G = nx.Graph() G.add_nodes_from(refcode for refcode, _ in valid) for (ref1, cry1), (ref2, cry2) in combinations(valid, 2): try: result = sim.compare(cry1, cry2) if result and result.nmatched_molecules == sim.settings.packing_shell_size and result.rmsd < 1.0: G.add_edge(ref1, ref2) except RuntimeError: continue return family_id, [sorted(group) for group in nx.connected_components(G)]
def _vdwfv_for_refcode(refcode: str) -> float: """ Compute the vdWFV metric (1 − packing coefficient) for a refcode. Parameters ---------- refcode : str CSD refcode. Returns ------- float vdWFV value. Raises ------ Exception If entry reading or coefficient retrieval fails. """ reader = io.EntryReader("CSD") cry = reader.entry(refcode).crystal return 1.0 - cry.packing_coefficient
[docs] def _representative_for_cluster(args: Tuple[str, List[str]]) -> Tuple[str, str]: """ Select the refcode with minimal vdWFV in a cluster. Parameters ---------- args : Tuple[str, List[str]] - family_id : str - cluster : List[str] Returns ------- Tuple[str, str] family_id and representative refcode. Raises ------ Exception If any lookup fails. """ family_id, cluster = args vdw_vals = {rc: _vdwfv_for_refcode(rc) for rc in cluster} lowest = min(vdw_vals.values()) rep = sorted([rc for rc, v in vdw_vals.items() if v == lowest])[0] return family_id, rep