Source code for pynxxas.io.xdi
"""XAS Data Interchange (XDI) file format
"""
import re
import datetime
from typing import Union, Tuple, Optional, Generator
import pint
import numpy
from . import url_utils
from ..models import units
from ..models.xdi import XdiModel
[docs]
def is_xdi_file(url: url_utils.UrlType) -> bool:
filename = url_utils.as_url(url).path
with open(filename, "r") as file:
try:
for line in file:
line = line.strip()
if not line:
continue
return line.startswith("# XDI")
except Exception:
return False
[docs]
def load_xdi_file(url: url_utils.UrlType) -> Generator[XdiModel, None, None]:
"""Specs described in
https://github.com/XraySpectroscopy/XAS-Data-Interchange/blob/master/specification/spec.md
"""
filename = url_utils.as_url(url).path
content = {"comments": [], "column": dict(), "data": dict()}
with open(filename, "r") as file:
# Version: first non-empty line
for line in file:
line = line.strip()
if not line:
continue
if not line.startswith("# XDI"):
raise ValueError(f"XDI file does not start with '# XDI': '{filename}'")
break
# Fields and comments: lines starting with "#"
is_comment = False
for line in file:
line = line.strip()
if not line.startswith("#"):
raise ValueError(f"Invalid XDI header line: '{line}'")
if _XDI_HEADER_END_REGEX.match(line):
break
if _XDI_FIELDS_END_REGEX.match(line):
# Next lines in the header are user comments
is_comment = True
continue
if is_comment:
match_comment = _XDI_COMMENT_REGEX.match(line)
if not match_comment:
continue
(comment,) = match_comment.groups()
content["comments"].append(comment)
continue
match_namespace = _XDI_FIELD_REGEX.match(line)
if match_namespace:
key, value = match_namespace.groups()
value = _parse_xdi_value(value)
key_parts = key.split(".")
if len(key_parts) > 1:
namespace, key = key_parts
namespace = namespace.lower()
key = key.lower()
key = _parse_xdi_value(key)
if namespace not in content:
content[namespace] = {}
content[namespace][key] = value
else:
key = key_parts[0]
key = _parse_xdi_value(key)
content[key] = value
# Data
table = numpy.loadtxt(filename, dtype=float)
columns = [
name
for _, name in sorted(content.pop("column").items(), key=lambda tpl: tpl[0])
]
for name, array in zip(columns, table.T):
name, quant = _parse_xdi_column_name(name)
content["data"][name] = array, quant
yield XdiModel(**content)
[docs]
def save_xdi_file(model_instance: XdiModel, url: url_utils.UrlType) -> None:
raise NotImplementedError(
f"Saving of {type(model_instance).__name__} not implemented"
)
_XDI_FIELD_REGEX = re.compile(r"#\s*([\w.]+):\s*(.*)")
_XDI_COMMENT_REGEX = re.compile(r"#\s*(.*)")
_XDI_HEADER_END_REGEX = re.compile(r"#\s*-")
_XDI_FIELDS_END_REGEX = re.compile(r"#\s*///")
_NUMBER_REGEX = re.compile(r"(?=.)([+-]?([0-9]*)(\.([0-9]+))?)([eE][+-]?\d+)?\s+\w+")
_SPACES_REGEX = re.compile(r"\s+")
def _parse_xdi_value(
value: str,
) -> Union[str, datetime.datetime, pint.Quantity, Tuple[str, pint.Quantity]]:
# Dimensionless integral number
try:
return units.as_quantity(int(value))
except ValueError:
pass
# Dimensionless decimal number
try:
return units.as_quantity(float(value))
except ValueError:
pass
# Date and time
try:
return datetime.datetime.fromisoformat(value)
except ValueError:
pass
# Number with units
if _NUMBER_REGEX.match(value):
try:
return units.as_quantity(value)
except pint.UndefinedUnitError:
pass
return value
def _parse_xdi_column_name(
name: str,
) -> Union[Tuple[str, Optional[str]]]:
parts = _SPACES_REGEX.split(name)
if len(parts) == 1:
return name, None
try:
units.as_units(parts[-1])
except pint.UndefinedUnitError:
return name, None
name = " ".join(parts[:-1])
return name, parts[-1]