Source code for crystal_analyzer

"""
Module: crystal_analyzer.py 

Main orchestration logic for extracting and processing molecular-crystal data
from the Cambridge Structural Database (CSD).

This module defines the CrystalAnalyzer class, which orchestrates the end-to-end
pipeline for:
- Extraction of refcode families
- Clustering of structures
- Extraction of structure-specific data
- Post-extraction processing (e.g., computing fragment properties)

Dependencies
------------
pandas
torch
csa_config
csd_operations
structure_data_extractor
structure_post_extraction_processor
"""

import logging
import pandas as pd

from pathlib import Path
from datetime import datetime
from typing import Type

import torch

from csa_config import ExtractionConfig
from csd_operations import CSDOperations
from structure_data_extractor import StructureDataExtractor
from structure_post_extraction_processor import StructurePostExtractionProcessor

logger = logging.getLogger(__name__)


[docs] class CrystalAnalyzer: """ Orchestrates the end-to-end extraction and processing pipeline for molecular- crystal data from the CSD. Attributes ---------- extraction_config : ExtractionConfig Controls which extraction substeps to run, batch sizes, file paths, and CSD filtering criteria. csd_ops : CSDOperations Handles direct interactions with the CSD (refcode families, downloads, etc.). extractor : StructureDataExtractor Performs detailed per-structure data extraction and parsing into HDF5. data_dir : pathlib.Path Directory where intermediate and output data (CSV, HDF5) are stored. """
[docs] def __init__( self, extraction_config: ExtractionConfig, csd_ops_cls: Type[CSDOperations] = CSDOperations, extractor_cls: Type[StructureDataExtractor] = StructureDataExtractor, ): """ Initialize the CrystalAnalyzer pipeline with specified configurations. Parameters ---------- extraction_config : ExtractionConfig Configuration object controlling which extraction substeps to run, batch sizes, file paths, and CSD filtering criteria. csd_ops_cls : Type[CSDOperations], optional Class implementing CSD operations. Default is CSDOperations. extractor_cls : Type[StructureDataExtractor], optional Class for extracting structure-specific data. Default is StructureDataExtractor. Raises ------ RuntimeError If any batched computation fails (e.g., OOM) or if shape mismatches occur when writing back to HDF5. IOError If appending to or reading from the HDF5 file fails. Notes ----- Data flows:: raw_HDF5 -> (load into torch tensors on CPU) -> run batch computations -> append new datasets to HDF5 -> log memory utilization and batch progress """ self.extraction_config = extraction_config # Resolve paths and create directories self.data_dir = Path(self.extraction_config.data_directory) self._setup_directories() # Instantiate low‐level CSD operations handler self.csd_ops = csd_ops_cls( data_directory=self.data_dir, data_prefix=self.extraction_config.data_prefix ) # Instantiate the StructureDataExtractor for detailed per‐structure data h5_path = self.data_dir / f"{self.extraction_config.data_prefix}.h5" self.extractor = extractor_cls( hdf5_path=h5_path, filters={ **self.extraction_config.filters, "data_directory": str(self.extraction_config.data_directory), "data_prefix": self.extraction_config.data_prefix }, batch_size=self.extraction_config.extraction_batch_size )
def _setup_directories(self) -> None: """ Verify and create (if needed) the output directory before any file I/O. This method checks: - extraction_config.data_directory / “raw” subfolder for downloaded CSD files - extraction_config.data_directory / “processed” subfolder for HDF5 outputs If any directory does not exist, it is created. Logs an INFO message for each new directory created. Raises ------ OSError If directory creation fails due to permission issues or invalid paths. """ self.data_dir.mkdir(parents=True, exist_ok=True)
[docs] def extract_data(self) -> None: """ Execute all data-extraction substeps specified by extraction_config.actions. The sequence of substeps is: 1. _extract_refcode_families (if actions.get("get_refcode_families") is True) 2. _cluster_refcode_families (if actions.get("cluster_refcode_families") is True) 3. _extract_unique_structures (if actions.get("get_unique_structures") is True) 4. _extract_structure_data (if actions.get("get_structure_data") is True) 5. _post_extraction_process (if actions.get("post_extraction_process") is True) During each substep, corresponding CSV/HDF5 files are generated (refcode lists, clustered families, per-structure atom lists, fragment datasets, etc.). The elapsed time for the entire pipeline is logged at INFO level. Raises ------ Exception If any substep fails (e.g., network error fetching from CSD, parsing error). """ try: logger.info("Starting data extraction pipeline...") start = datetime.now() if self.extraction_config.actions.get("get_refcode_families"): self._extract_refcode_families() if self.extraction_config.actions.get("cluster_refcode_families"): self._cluster_refcode_families() if self.extraction_config.actions.get("get_unique_structures"): self._extract_unique_structures() if self.extraction_config.actions.get("get_structure_data"): self._extract_structure_data() if self.extraction_config.actions.get("post_extraction_process"): self._post_extraction_process() duration = datetime.now() - start logger.info(f"Data extraction completed in {duration}") except Exception as e: logger.error(f"Error during extraction: {e}") raise
[docs] def _extract_refcode_families(self) -> pd.DataFrame: """ Query CSD to retrieve all refcode families, save to disk, and return. This method performs the following steps: - Invoke self.csd_ops.get_refcode_families_df() - Receive a DataFrame with columns ['family_id', 'refcode'] - Write the DataFrame to disk at: extraction_config.data_directory / f"{extraction_config.data_prefix}_refcode_families.csv" - Log the number of families retrieved at INFO level Returns ------- pandas.DataFrame DataFrame with columns: - family_id : Unique integer or string ID for each refcode family - refcode : CSD refcode belonging to that family """ logger.info("Extracting refcode families into DataFrame...") df = self.csd_ops.get_refcode_families_df() self.csd_ops.save_refcode_families_csv() n_structures = len(df) n_families = df['family_id'].nunique() logger.info(f"Extracted {n_structures} structures across {n_families} families") return df
[docs] def _cluster_refcode_families(self) -> None: """ Group structures within each refcode family according to packing similarity. This method performs the following steps: - Read the CSV produced by _extract_refcode_families() - For each family_id, call self.csd_ops.cluster_families(family_id, output_path) to perform clustering of atomic coordinates. - Save clustering results to: extraction_config.data_directory / f"{extraction_config.data_prefix}_clustered_families.csv" - Log the number of clusters and cluster sizes at INFO level. Raises ------ RuntimeError If clustering fails for any family (e.g., insufficient data, corrupted CIF). """ try: logger.info("Clustering refcode families...") clustered = self.csd_ops.cluster_families(self.extraction_config.filters) logger.info(f"Refcode families clustered into {len(clustered)} groups.") except Exception as e: logger.error(f"Clustering of refcode families failed. {e}") raise
[docs] def _extract_unique_structures(self) -> None: """ Retrieve unique crystal structures for each cluster representative. This method performs the following steps: - Read the clustered families CSV to identify one representative refcode per cluster. - For each representative refcode: - Use self.csd_ops.get_unique_structures() to fetch atomic coordinates, symmetry operators, and other metadata. - Save the raw CIF to: extraction_config.data_directory / f"{extraction_config.data_prefix}_structures/{refcode}.cif" - Update and log status (total structures fetched, failures, retries). Raises ------ IOError If any CIF fails to download or write to disk. """ logger.info("Selecting unique structures …") df_unique = self.csd_ops.get_unique_structures( self.extraction_config.filters, method="vdWFV" ) logger.info( "Unique structures selected: %d structures across %d families", len(df_unique), df_unique['family_id'].nunique() )
[docs] def _extract_structure_data(self) -> None: """ Parse each downloaded CIF and extract fundamental structure data into HDF5. For each CIF in extraction_config.data_directory: - Use StructureDataExtractor to read atomic labels, fractional coordinates, symmetry operations, lattice parameters, and partial charges. - Organize the extracted data into a pandas DataFrame. - Batch-write the data to: extraction_config.data_directory / f"{extraction_config.data_prefix}_structure_data.h5" - Log the total number of structures processed and any parse errors. This method ensures that all per-structure numerics (coords, masks, labels) are stored in GPU-friendly formats for further GPU processing. Raises ------ ValueError If CIF parsing yields inconsistent shapes (e.g., mismatched atom count vs. mask). IOError If HDF5 write fails due to disk space or file permissions. """ h5_path = self.data_dir / f"{self.extraction_config.data_prefix}.h5" logger.info(f"Extracting detailed structure data into {h5_path} …") self.extractor.run() logger.info(f"Detailed structure data extracted and saved to {h5_path}")
[docs] def _post_extraction_process(self) -> None: """ Perform all post-extraction computations on the raw structure data. This step typically includes: - Fragment identification (rigid-fragment or molecular fragment detection) - Computation of fragment centers of mass (Cartesian & fractional) - Computation of fragment inertia tensors, eigenvalues, and quaternions - Computation of all intermolecular contacts and hydrogen-bond identification - Computation of distances/vectors from each contact atom to fragment COM - Augmentation of HDF5 datasets with new variable-length datasets for fragment-related properties Notes ----- Data flows:: raw_HDF5 -> (load into torch tensors on CPU) -> run batch computations -> append new datasets to HDF5 -> log memory utilization and batch progress Raises ------ RuntimeError If any batched computation fails (e.g., OOM) or if shape mismatches occur when writing back to HDF5. IOError If appending to or reading from the HDF5 file fails. """ h5_path = self.data_dir / f"{self.extraction_config.data_prefix}.h5" proc = StructurePostExtractionProcessor( hdf5_path=h5_path, batch_size=self.extraction_config.post_extraction_batch_size, device=torch.device("cuda") ) proc.run()