Source code for pygra.dataset
"""
dataset.py — data loading and transform operations
"""
import csv
from pathlib import Path
import numpy as np
from scipy.signal import savgol_filter
def _can_float(s: str) -> bool:
try:
float(s)
return True
except ValueError:
return False
[docs]
class DataSet:
"""
Load a whitespace-delimited data file into a NumPy array.
Lines beginning with ``#`` and blank lines are silently ignored.
Rows that cannot be fully converted to floats are skipped and
recorded in :attr:`skipped_rows`.
Parameters
----------
path : str
Absolute or relative path to the data file.
Attributes
----------
path : str
Path passed to the constructor.
name : str
Filename component of *path* (basename).
raw : list of list of float
Parsed data rows as a nested list, populated during loading.
arr : numpy.ndarray
2-D float64 array of shape ``(nrows, ncols)``.
Shape is ``(0, 0)`` when no valid rows were found.
skipped_rows : list of tuple[int, str]
``(line_number, raw_content)`` for every row that could not be
parsed. Line numbers are 1-based.
"""
[docs]
def __init__(self, path: str, step: int = 1):
"""
Parameters
----------
path : str
Path to the data file to load.
step : int, optional
Load every *step*-th row (default 1, no downsampling).
"""
self.path = path
self.name = Path(path).name
self.raw: list = []
self.downsample_step = step
self._load()
if step > 1:
self.arr = self.arr[::step]
def _load(self):
self.skipped_rows: list[tuple[int, str]] = []
with open(self.path) as f:
lines = f.readlines()
# Detect delimiter from the first non-comment, non-empty line
use_csv = False
for raw_line in lines:
stripped = raw_line.strip()
if stripped and not stripped.startswith('#'):
use_csv = ',' in stripped
break
header_consumed = False
for lineno, raw_line in enumerate(lines, start=1):
line = raw_line.strip()
if not line or line.startswith('#'):
continue
if use_csv:
tokens = [t.strip() for t in next(csv.reader([line]))]
else:
tokens = line.split()
try:
row = [float(t) for t in tokens]
except ValueError:
if not header_consumed and not any(_can_float(t) for t in tokens):
header_consumed = True
continue
self.skipped_rows.append((lineno, line))
continue
self.raw.append(row)
header_consumed = True
self.arr = np.array(self.raw) if self.raw else np.empty((0, 0))
@property
def ncols(self) -> int:
"""
Number of columns in the loaded data.
Returns
-------
int
``arr.shape[1]`` when the array is 2-D and non-empty,
otherwise ``0``.
"""
return self.arr.shape[1] if self.arr.ndim == 2 and self.arr.size > 0 else 0
@property
def nrows(self) -> int:
"""
Number of data rows.
Returns
-------
int
``arr.shape[0]``.
"""
return self.arr.shape[0]
[docs]
def col(self, idx: int):
"""
Return a copy of column *idx*.
Parameters
----------
idx : int
Column index (0-based).
Returns
-------
numpy.ndarray or None
Copy of the column as a 1-D float64 array, or ``None`` if
*idx* is out of range.
"""
if idx < 0 or idx >= self.ncols:
return None
return self.arr[:, idx].copy()
[docs]
def apply_transform(dataset: DataSet, cfg: dict) -> np.ndarray:
"""
Apply a transform operation to one column of a DataSet.
Parameters
----------
dataset : DataSet
The dataset to modify. Extended in-place when
``cfg["new_col"]`` is ``True``; column overwritten when ``False``.
cfg : dict
Transform configuration with the following keys:
``"col"`` : int
Target column index.
``"op"`` : str
Operation name; must be one of the strings in
:data:`constants.TRANSFORM_OPS`.
``"val"`` : float
Scalar operand for arithmetic and normalisation operations,
or window size for the moving average.
``"xcol"`` : int, optional
x-column index; required when *op* is
``"numerical derivative (dy/dx)"``.
``"new_col"`` : bool, optional
If ``True`` (default) the result is appended as a new column;
if ``False`` it overwrites column ``"col"``.
Returns
-------
numpy.ndarray
1-D array containing the transform result.
Raises
------
ValueError
If the target column does not exist, if division or normalisation
by zero is attempted, or if *op* is not a recognised operation.
"""
col = dataset.col(cfg["col"])
if col is None:
raise ValueError(f"Column {cfg['col']} does not exist.")
op = cfg["op"]
val = cfg["val"]
if op == "multiply by constant":
result = col * val
elif op == "divide by constant":
if val == 0:
raise ValueError("Cannot divide by zero.")
result = col / val
elif op == "add constant":
result = col + val
elif op == "subtract constant":
result = col - val
elif op == "normalize by max":
m = np.max(np.abs(col))
if m == 0:
raise ValueError("Max is zero, cannot normalize.")
result = col / m
elif op == "normalize by value":
if val == 0:
raise ValueError("Cannot normalize by zero.")
result = col / val
elif op == "numerical derivative (dy/dx)":
xcol = dataset.col(cfg["xcol"])
if xcol is None:
raise ValueError(f"x column {cfg['xcol']} does not exist.")
result = np.gradient(col, xcol)
elif op == "moving average":
w = max(3, int(val))
if w % 2 == 0:
w += 1
wl = min(w, len(col) - (0 if len(col) % 2 != 0 else 1))
result = savgol_filter(col, window_length=wl, polyorder=1)
else:
raise ValueError(f"Unknown operation: {op}")
if cfg.get("new_col", True):
dataset.arr = np.column_stack([dataset.arr, result])
else:
dataset.arr[:, cfg["col"]] = result
return result