"""
Class for extracting information from the HDF5 files generated during simulation by the
:obj:`schnetpack.md.simulation_hooks.logging_hooks.FileLogger`.
In addition to loading structures, velocities, etc., various postprocessing functions are available.
"""
import json
import logging
import h5py
import numpy as np
from ase import Atoms
from typing import Optional
from tqdm import trange
from schnetpack import properties, units
log = logging.getLogger(__name__)
class HDF5LoaderError(Exception):
"""
Exception for HDF5 loader class.
"""
pass
[docs]class HDF5Loader:
"""
Class for loading HDF5 datasets written by the FileLogger. By default, this requires at least a MoleculeStream to be
present. PropertyData is also read by default, but can be disabled.
Args:
hdf5_database (str): Path to the database file.
skip_initial (int): Skip the initial N configurations in the trajectory, e.g. to account for equilibration
(default=0).
load_properties (bool): Extract and reconstruct the property data stored by a PropertyStream (e.g. forces,
energies, etc.), enabled by default.
"""
def __init__(
self,
hdf5_database: str,
skip_initial: Optional[int] = 0,
load_properties: Optional[bool] = True,
):
self.database = h5py.File(hdf5_database, "r", swmr=True, libver="latest")
self.skip_initial = skip_initial
self.data_groups = list(self.database.keys())
self.properties = {}
# Load basic structure properties and MD info
if "molecules" not in self.data_groups:
raise HDF5LoaderError(
"Molecule data not found in {:s}".format(hdf5_database)
)
else:
self._load_molecule_data()
# If requested, load other properties predicted by the model stored via PropertyStream
if load_properties:
if "properties" not in self.data_groups:
raise HDF5LoaderError(
"Molecule properties not found in {:s}".format(hdf5_database)
)
else:
self._load_property_data()
# Do formatting for info
loaded_properties = list(self.properties.keys())
if len(loaded_properties) == 1:
loaded_properties = str(loaded_properties[0])
else:
loaded_properties = (
", ".join(loaded_properties[:-1]) + " and " + loaded_properties[-1]
)
log.info(
"Loaded properties {:s} from {:s}".format(loaded_properties, hdf5_database)
)
def _load_molecule_data(self):
"""
Load data stored by a MoleculeStream. This contains basic information on the system and is required to be
present for the Loader.
"""
# This is for molecule streams
structures = self.database["molecules"]
# General database info
self.n_replicas = structures.attrs["n_replicas"]
self.n_molecules = structures.attrs["n_molecules"]
self.total_n_atoms = structures.attrs["total_n_atoms"]
self.n_atoms = structures.attrs["n_atoms"]
self.time_step = structures.attrs["time_step"]
# Set up molecule ranges
self.molecule_range = np.pad(np.cumsum(self.n_atoms), (1, 0), mode="constant")
# Determine loading
self.total_entries = structures.attrs["entries"]
self.entries = self.total_entries - self.skip_initial
# Get atom types and masses
self.properties[properties.Z] = structures.attrs["atom_types"]
self.properties[properties.masses] = structures.attrs["masses"]
# Get position and velocity blocks
raw_positions = structures[self.skip_initial : self.total_entries]
entry_width = self.total_n_atoms * 3
# Extract energies
entry_start = 0
entry_stop = self.n_molecules
self.properties[f"{properties.energy}_system"] = raw_positions[
:, :, entry_start:entry_stop
].reshape(self.entries, self.n_replicas, self.n_molecules)
# Extract positions
entry_start = entry_stop
entry_stop += entry_width
self.properties[properties.R] = raw_positions[
:, :, entry_start:entry_stop
].reshape(self.entries, self.n_replicas, self.total_n_atoms, 3)
# Extract velocities if present
if structures.attrs["has_velocities"]:
entry_start = entry_stop
entry_stop += entry_width
self.properties["velocities"] = raw_positions[
:, :, entry_start:entry_stop
].reshape(self.entries, self.n_replicas, self.total_n_atoms, 3)
# Extract quantities due to simulation in box
if structures.attrs["has_cells"]:
# Get simulation cells
entry_start = entry_stop
entry_stop += 9 * self.n_molecules
self.properties[properties.cell] = raw_positions[
:, :, entry_start:entry_stop
].reshape(self.entries, self.n_replicas, self.n_molecules, 3, 3)
# Get stress tensor
entry_start = entry_stop
entry_stop += 9 * self.n_molecules
self.properties[f"{properties.stress}_system"] = raw_positions[
:, :, entry_start:entry_stop
].reshape(self.entries, self.n_replicas, self.n_molecules, 3, 3)
else:
self.properties[properties.cell] = None
self.pbc = structures.attrs["pbc"]
def _load_property_data(self):
"""
Load properties and their shape from the corresponding group in the hdf5 database.
Properties are then reshaped to original form and stores in the self.properties dictionary.
"""
# And for property stream
properties = self.database["properties"]
property_shape = json.loads(properties.attrs["shapes"])
property_positions = json.loads(properties.attrs["positions"])
# Reformat properties
for prop in property_positions:
prop_pos = slice(*property_positions[prop])
self.properties[prop] = properties[
self.skip_initial : self.total_entries, :, prop_pos
].reshape(
(
self.total_entries - self.skip_initial,
self.n_replicas,
*property_shape[prop],
)
)
def get_property(
self,
property_name: str,
atomistic: bool,
mol_idx: Optional[int] = 0,
replica_idx: Optional[int] = None,
):
"""
Extract property from dataset.
Args:
property_name (str): Name of the property as contained in the self.properties dictionary.
atomistic (bool): Whether the property is atomistic (e.g. forces) or defined for the whole molecule
(e.g. energies, dipole moments).
mol_idx (int): Index of the molecule to extract, by default uses the first molecule (mol_idx=0)
replica_idx (int): Replica of the molecule to extract (e.g. for ring polymer molecular dynamics). If
replica_idx is set to None (default), the centroid is returned if multiple replicas are
present.
Returns:
np.array: N_steps x property dimensions array containing the requested property collected during
the simulation.
"""
if atomistic:
mol_idx = slice(
self.molecule_range[mol_idx], self.molecule_range[mol_idx + 1]
)
else:
mol_idx = mol_idx
# Check whether property is present
if property_name not in self.properties:
raise HDF5LoaderError(f"Property {property_name} not found in database.")
if self.properties[property_name] is None:
# Typically used for cells
return None
elif property_name == properties.Z or property_name == properties.masses:
# Special case for atom types and masses
return self.properties[property_name][mol_idx]
else:
# Standard properties
target_property = self.properties[property_name][:, :, mol_idx, ...]
# Compute the centroid unless requested otherwise
if replica_idx is None:
if target_property is not None:
target_property = np.mean(target_property, axis=1)
else:
if target_property is not None:
target_property = target_property[:, replica_idx, ...]
return target_property
def get_velocities(
self, mol_idx: Optional[int] = 0, replica_idx: Optional[int] = None
):
"""
Auxiliary routine for getting the velocities of specific molecules and replicas.
Args:
mol_idx (int): Index of the molecule to extract, by default uses the first molecule (mol_idx=0)
replica_idx (int): Replica of the molecule to extract (e.g. for ring polymer molecular dynamics). If
replica_idx is set to None (default), the centroid is returned if multiple replicas are
present.
Returns:
np.array: N_steps x N_atoms x 3 array containing the atom velocities of the simulation in internal units.
"""
return self.get_property(
"velocities", atomistic=True, mol_idx=mol_idx, replica_idx=replica_idx
)
def get_positions(
self, mol_idx: Optional[int] = 0, replica_idx: Optional[int] = None
):
"""
Auxiliary routine for getting the positions of specific molecules and replicas.
Args:
mol_idx (int): Index of the molecule to extract, by default uses the first molecule (mol_idx=0)
replica_idx (int): Replica of the molecule to extract (e.g. for ring polymer molecular dynamics). If
replica_idx is set to None (default), the centroid is returned if multiple replicas are
present.
Returns:
np.array: N_steps x N_atoms x 3 array containing the atom positions of the simulation in internal units.
"""
return self.get_property(
properties.R, atomistic=True, mol_idx=mol_idx, replica_idx=replica_idx
)
def get_kinetic_energy(
self, mol_idx: Optional[int] = 0, replica_idx: Optional[int] = None
):
"""
Auxiliary routine for computing the kinetic energy of every configuration based on it's velocities.
Args:
mol_idx (int): Index of the molecule to extract, by default uses the first molecule (mol_idx=0)
replica_idx (int): Replica of the molecule to extract (e.g. for ring polymer molecular dynamics). If
replica_idx is set to None (default), the centroid is returned if multiple replicas are
present.
Returns:
np.array: N_steps array containing the kinetic energy of every configuration in internal units.
"""
# Get the velocities
velocities = self.get_velocities(mol_idx=mol_idx, replica_idx=replica_idx)
# Get the masses
masses = self.get_property(
properties.masses, atomistic=True, mol_idx=mol_idx, replica_idx=replica_idx
)
# Compute the kinetic energy as 1/2*m*v^2
kinetic_energy = 0.5 * np.sum(
masses[None, :, None] * velocities**2, axis=(1, 2)
)
return kinetic_energy
def get_potential_energy(
self, mol_idx: Optional[int] = 0, replica_idx: Optional[int] = None
):
"""
Auxiliary routine for extracting a systems potential energy.
Args:
mol_idx (int): Index of the molecule to extract, by default uses the first molecule (mol_idx=0)
replica_idx (int): Replica of the molecule to extract (e.g. for ring polymer molecular dynamics). If
replica_idx is set to None (default), the centroid is returned if multiple replicas are
present.
Returns:
np.array: N_steps array containing the potential energy of every configuration in internal units.
"""
energy_key = f"{properties.energy}_system"
return self.get_property(
energy_key, atomistic=False, mol_idx=mol_idx, replica_idx=replica_idx
)
def get_temperature(
self, mol_idx: Optional[int] = 0, replica_idx: Optional[int] = None
):
"""
Auxiliary routine for computing the instantaneous temperature of every configuration.
Args:
mol_idx (int): Index of the molecule to extract, by default uses the first molecule (mol_idx=0)
replica_idx (int): Replica of the molecule to extract (e.g. for ring polymer molecular dynamics). If
replica_idx is set to None (default), the centroid is returned if multiple replicas are
present.
Returns:
np.array: N_steps array containing the temperature of every configuration in Kelvin.
"""
# Get the velocities
# Get the kinetic energy
kinetic_energy = self.get_kinetic_energy(
mol_idx=mol_idx, replica_idx=replica_idx
)
# Compute the temperature
temperature = 2.0 / (3.0 * units.kB * self.n_atoms[mol_idx]) * kinetic_energy
return temperature
def get_volume(self, mol_idx: Optional[int] = 0, replica_idx: Optional[int] = None):
"""
Auxiliary routine for computing the cell volume in periodic simulations.
Args:
mol_idx (int): Index of the molecule to extract, by default uses the first molecule (mol_idx=0)
replica_idx (int): Replica of the molecule to extract (e.g. for ring polymer molecular dynamics). If
replica_idx is set to None (default), the centroid is returned if multiple replicas are
present.
Returns:
np.array: N_steps array containing the cell volume of every configuration in internal units.
"""
cells = self.get_property(
properties.cell, atomistic=False, mol_idx=mol_idx, replica_idx=replica_idx
)
return np.linalg.det(cells)
def get_stress(self, mol_idx: Optional[int] = 0, replica_idx: Optional[int] = None):
"""
Auxiliary routine for extracting the stress tensor in cell simulations.
Args:
mol_idx (int): Index of the molecule to extract, by default uses the first molecule (mol_idx=0)
replica_idx (int): Replica of the molecule to extract (e.g. for ring polymer molecular dynamics). If
replica_idx is set to None (default), the centroid is returned if multiple replicas are
present.
Returns:
np.array: N_steps x 3 x 3 array containing the stress tensor of every configuration in internal units.
"""
stress_key = f"{properties.stress}_system"
return self.get_property(
stress_key, atomistic=False, mol_idx=mol_idx, replica_idx=replica_idx
)
def get_pressure(
self, mol_idx: Optional[int] = 0, replica_idx: Optional[int] = None
):
"""
Auxiliary routine for computing the pressure in periodic simulations.
Args:
mol_idx (int): Index of the molecule to extract, by default uses the first molecule (mol_idx=0)
replica_idx (int): Replica of the molecule to extract (e.g. for ring polymer molecular dynamics). If
replica_idx is set to None (default), the centroid is returned if multiple replicas are
present.
Returns:
np.array: N_steps array containing the pressure of every configuration in bar.
"""
# Get kinetic energy and volume
kinetic_energy = self.get_kinetic_energy(
mol_idx=mol_idx, replica_idx=replica_idx
)
volume = self.get_volume(mol_idx=mol_idx, replica_idx=replica_idx)
# Compute isotropic stress
stress = (
np.einsum(
"bii->b", self.get_stress(mol_idx=mol_idx, replica_idx=replica_idx)
)
/ 3.0
)
# Compute the pressure and convert to bar
pressure = (2.0 / 3.0 * kinetic_energy / volume - stress) / units.bar
return pressure
def convert_to_atoms(
self, mol_idx: Optional[int] = 0, replica_idx: Optional[int] = None
):
"""
Converts molecular structures to a list of ASE Atom objects. Length units are converted from the internal unit
system to Angstrom.
Args:
mol_idx (optional, int): Index of molecule to extract (default=0).
replica_idx (optional, int): Index of replica to extract. If set to None, centroid of property is computed
instead (default=None).
Returns:
list(ase.Atoms): List of ASE atom objects containing molecular structures.
"""
positions = self.get_positions(
mol_idx=mol_idx, replica_idx=replica_idx
) / units.unit2internal("Angstrom")
atomic_numbers = self.get_property(
properties.Z, atomistic=True, mol_idx=mol_idx, replica_idx=replica_idx
)
cells = self.get_property(
properties.cell, atomistic=False, mol_idx=mol_idx, replica_idx=replica_idx
)
if cells is None:
cells = [None] * self.entries
else:
cells = cells / units.unit2internal("Angstrom")
all_atoms = []
log.info("Extracting structures...")
for idx in trange(self.entries):
atoms = Atoms(
atomic_numbers, positions[idx], cell=cells[idx], pbc=self.pbc[mol_idx]
)
atoms.wrap()
all_atoms.append(atoms)
return all_atoms