"""NeXus/HDF5 file format
"""
from typing import Generator, Any, Tuple
import h5py
import pint
import pydantic
from . import url_utils
from . import hdf5_utils
from ..models import nexus
[docs]
def is_nexus_file(url: url_utils.UrlType) -> bool:
filename = url_utils.as_url(url).path
with open(filename, "rb") as file:
try:
with h5py.File(file, mode="r"):
return True
except Exception:
return False
[docs]
def load_nexus_file(url: url_utils.UrlType) -> Generator[nexus.NxGroup, None, None]:
raise NotImplementedError(f"File format not supported: {url}")
[docs]
def save_nexus_file(nxgroup: nexus.NxXasModel, url: url_utils.UrlType) -> None:
if not isinstance(nxgroup, nexus.NxXasModel):
raise TypeError(f"nxgroup is not of type NxXasModel ({type(nxgroup)})")
if not nxgroup.has_data():
return
filename = url_utils.as_url(url).path
url = url_utils.as_url(url)
with h5py.File(filename, mode="a", track_order=True) as nxroot:
nxparent = _prepare_nxparent(nxgroup, url, nxroot)
_save_nxgroup(nxgroup, nxparent)
def _save_nxgroup(nxgroup: nexus.NxGroup, nxparent: h5py.Group) -> None:
if not isinstance(nxgroup, nexus.NxGroup):
raise TypeError(f"nxgroup is not of type NxGroup ({type(nxgroup)})")
for field_name, field, field_value in _iter_model_fields(nxgroup):
if field_value is None:
continue
elif isinstance(field_value, nexus.NxGroup):
nxchild = nxparent.require_group(field_name)
_save_nxgroup(field_value, nxchild)
if isinstance(field_value, nexus.NxDataModel):
_set_default(nxchild)
elif field.alias and field.alias.startswith("@"):
try:
_save_attribute(nxparent, field_name, field_value)
except Exception as e:
raise ValueError(
f"{field_name} = {field_value} ({type(field_value)}) cannot be saved as an HDF5 attribute"
) from e
else:
try:
_save_dataset(nxparent, field_name, field_value)
except Exception as e:
raise ValueError(
f"{field_name} = {field_value} ({type(field_value)}) cannot be saved as an HDF5 dataset"
) from e
def _iter_model_fields(
model: pydantic.BaseModel,
) -> Generator[Tuple[str, pydantic.Field, Any], None, None]:
for field_name, field in model.__fields__.items():
field_value = getattr(model, field_name)
yield field_name, field, field_value
def _save_dataset(nxparent: h5py.Group, field_name: str, field_value: Any) -> None:
if isinstance(field_value, nexus.NxField):
nxparent[field_name] = field_value.value
for attr_name, attr, attr_value in _iter_model_fields(field_value):
if attr.alias and attr.alias.startswith("@"):
nxparent[field_name].attrs[attr_name] = attr_value
elif isinstance(field_value, pint.Quantity):
if field_value.size:
nxparent[field_name] = field_value.magnitude
units = str(field_value.units)
if units:
nxparent[field_name].attrs["units"] = units
elif isinstance(field_value, nexus.NxLinkModel):
link = hdf5_utils.create_hdf5_link(
nxparent, field_value.target_name, field_value.target_filename
)
nxparent[field_name] = link
else:
nxparent[field_name] = field_value
def _save_attribute(nxparent: h5py.Group, field_name: str, field_value: Any) -> None:
nxparent.attrs[field_name] = field_value
def _set_default(h5group: h5py.Group) -> None:
while h5group.name != "/":
h5group.parent.attrs["default"] = h5group.name.split("/")[-1]
h5group = h5group.parent
def _prepare_nxparent(
nxgroup: nexus.NxGroup,
url: url_utils.ParsedUrlType,
nxroot: h5py.File,
) -> h5py.Group:
"""Creates and returns the parent group of `nxgroup`"""
internal_path = url_utils.as_url(url).internal_path
parts = [s for s in internal_path.split("/") if s]
nparts = len(parts)
if nxgroup.NX_class == "NXroot":
if nparts != 0:
raise ValueError(
f"NXroot URL cannot have an internal path ({internal_path})"
)
nxclasses = []
elif nxgroup.NX_class == "NXentry":
if nparts != 1:
raise ValueError(
f"NXentry URL must have an internal path of 1 level deep ({internal_path})"
)
nxclasses = ["NXentry"]
elif nxgroup.NX_class == "NXsubentry":
if nparts != 2:
raise ValueError(
f"NXsubentry URL must have an internal path of 2 levels deep ({internal_path})"
)
nxclasses = ["NXentry", "NXsubentry"]
else:
nxclasses = ["NXentry"] + ["NXsubentry"] * (len(parts) - 1)
nxroot.attrs.setdefault("NX_class", "NXroot")
nxparent = nxroot
for part, nxclass in zip(parts, nxclasses):
nxparent = nxparent.require_group(part)
nxparent.attrs.setdefault("NX_class", nxclass)
return nxparent