"""High-level interface for projections through Voronoi meshes.
Provides :class:`ProjectionCloud`, the main user-facing class that wraps
the C++ backend for grid projections, direct ray projections, and
traced projections with per-segment detail.
"""
from __future__ import annotations
import logging
from numpy.typing import ArrayLike
from .Cvortrace import ( # type: ignore
PointCloud, Projection, ReductionMode, Slice,
)
from vortrace import grid as gr
import numpy as np
_log = logging.getLogger("vortrace")
[docs]
class ProjectionCloud:
"""Main interface for projections through an unstructured Voronoi mesh.
Wraps the C++ ``Cvortrace`` backend to provide grid-based projections,
arbitrary ray projections, and single-ray segment queries. Supports
multiple fields and reduction modes (sum/integrate, max, min, volume).
"""
_REDUCTION_MAP = {
'integrate': ReductionMode.Sum,
'sum': ReductionMode.Sum,
'max': ReductionMode.Max,
'min': ReductionMode.Min,
'volume': ReductionMode.VolumeRender,
}
def __init__(self, pos: ArrayLike, fields: ArrayLike,
boundbox: list[float] | None = None,
vol: ArrayLike | None = None, *,
periodic: bool = False,
_skip_build_tree: bool = False) -> None:
"""Create a ProjectionCloud from particle data.
Parameters
----------
pos : array_like, shape (N, 3)
Particle positions.
fields : array_like, shape (N,) or (N, nfields)
Scalar field(s) to integrate (e.g. density).
boundbox : list of float, optional
Bounding box ``[xmin, xmax, ymin, ymax, zmin, zmax]``.
If *None*, derived from *pos*.
vol : array_like of shape (N,), optional
Cell volumes. When provided, the C++ backend uses them
for adaptive padding of the kDTree search radius.
periodic : bool, optional
If *True*, enable periodic boundary conditions. The
bounding box defines the periodic domain and no spatial
filtering is applied (all particles are loaded).
"""
# Store original data
self.pos_orig = np.ascontiguousarray(pos, dtype=np.float64)
self.fields_orig = np.ascontiguousarray(fields, dtype=np.float64)
self.vol_orig = np.ascontiguousarray(vol, dtype=np.float64) \
if vol is not None else None
self.periodic = periodic
fields_array = np.ascontiguousarray(self.fields_orig, dtype=np.float64)
# Determine number of fields
if fields_array.ndim == 1:
self._nfields = 1
elif fields_array.ndim == 2:
self._nfields = fields_array.shape[1]
else:
raise ValueError(
"fields must be 1D (npart,) or 2D (npart, nfields)")
if boundbox is None:
boundbox = [
pos[:, 0].min(), pos[:, 0].max(), pos[:, 1].min(),
pos[:, 1].max(), pos[:, 2].min(), pos[:, 2].max()
]
self.boundbox = boundbox
# C++ handles filtering internally
self._cloud = PointCloud()
if self.vol_orig is not None:
self._cloud.loadPoints(self.pos_orig, self.fields_orig,
boundbox, self.vol_orig, periodic)
else:
self._cloud.loadPoints(self.pos_orig, self.fields_orig,
boundbox, periodic=periodic)
if not _skip_build_tree:
self._cloud.buildTree()
# Get filtered index mapping from C++
self.orig_ids = np.array(self._cloud.get_orig_ids())
_log.info("Applied bounding box filter: %d -> %d particles",
len(self.pos_orig), len(self.orig_ids))
def _validate_reduction(self, reduction: str) -> ReductionMode:
"""Validate reduction mode and return the corresponding enum.
Raises ValueError for unknown modes or invalid field configuration.
"""
mode = self._REDUCTION_MAP.get(reduction)
if mode is None:
raise ValueError(
f"Unknown reduction {reduction!r}. "
f"Use one of {list(self._REDUCTION_MAP)}")
if mode == ReductionMode.VolumeRender:
if self._nfields != 4:
raise ValueError(
"Volume rendering requires exactly 4 fields "
f"(R, G, B, alpha), got {self._nfields}")
if not self._cloud.get_valid_rgba():
raise ValueError(
"Volume rendering requires R, G, B values in [0, 1] "
"and alpha >= 0")
return mode
def _prepare_array_for_backend(self, arr: ArrayLike) -> np.ndarray:
"""Converts input to a C-contiguous float64 numpy array if not already.
Args:
arr: The input array-like object.
Returns:
np.ndarray: A C-contiguous, float64 numpy array.
"""
np_arr = np.asarray(arr)
if np_arr.dtype != np.float64 or not np_arr.flags['C_CONTIGUOUS']:
np_arr = np.ascontiguousarray(np_arr, dtype=np.float64)
return np_arr
[docs]
def grid_projection(self, extent: ArrayLike,
nres: int | tuple[int, int],
bounds: ArrayLike,
center: ArrayLike | None, *,
proj: str | None = None,
yaw: float = 0., pitch: float = 0.,
roll: float = 0.,
reduction: str = 'integrate') -> np.ndarray:
"""Make a grid projection through the point cloud.
Args:
extent: Spatial extent ``[min, max]`` or ``[[xmin,xmax],
[ymin,ymax]]``.
nres: Number of pixels (int for square, or ``(nx, ny)``).
bounds: Integration bounds ``[z_start, z_end]``.
center: Rotation center ``(x, y, z)`` or None.
proj: Cartesian projection string (e.g. ``'xy'``).
yaw: Yaw angle in radians.
pitch: Pitch angle in radians.
roll: Roll angle in radians.
reduction: ``'integrate'``/``'sum'``, ``'max'``,
``'min'``, or ``'volume'``. Volume rendering
requires exactly 4 fields (R, G, B, alpha) and
returns 3 output channels (RGB).
Returns:
ndarray: Shape ``(nres, nres)`` for single field,
``(nres, nres, nfields)`` for multi-field, or
``(nres, nres, 3)`` for volume rendering.
"""
reduction_mode = self._validate_reduction(reduction)
pos_start, pos_end = gr.generate_projection_grid(extent, nres, bounds,
center, proj=proj,
yaw=yaw, pitch=pitch,
roll=roll)
# Flatten before feeding into backend.
orig_shape = pos_start.shape
pos_start = pos_start.reshape(-1, pos_start.shape[-1])
pos_end = pos_end.reshape(-1, pos_end.shape[-1])
# Actually do the projection using the Cvortrace backend.
proj_obj = Projection(pos_start, pos_end)
proj_obj.makeProjection(self._cloud, reduction_mode)
dat = proj_obj.returnProjection()
# Reshape before returning.
out_nfields = (3 if reduction_mode == ReductionMode.VolumeRender
else self._nfields)
if out_nfields == 1:
dat = np.reshape(dat, orig_shape[:-1])
else:
dat = np.reshape(dat, (*orig_shape[:-1], out_nfields))
return dat
[docs]
def projection(self, pos_start: ArrayLike, pos_end: ArrayLike, *,
reduction: str = 'integrate') -> np.ndarray:
"""Make a projection through the point cloud.
Args:
pos_start (array of float): Starting points of the projection.
pos_end (array of float): Ending points of the projection.
reduction (str): Reduction mode: 'integrate'/'sum', 'max', 'min',
or 'volume'. Volume rendering requires 4 fields (R, G, B,
alpha) and returns 3 channels (RGB).
Returns:
dat (array of float): The projection data. Shape ``(N,)`` when
a single field was loaded, ``(N, nfields)`` otherwise.
"""
reduction_mode = self._validate_reduction(reduction)
pos_start = self._prepare_array_for_backend(pos_start)
pos_end = self._prepare_array_for_backend(pos_end)
# ——— sanity‐check shape ———
if (pos_start.ndim != 2 or pos_end.ndim != 2 or
pos_start.shape != pos_end.shape or
pos_start.shape[1] != 3): # Ensure 3 columns for x,y,z
raise ValueError('pos_start / pos_end must be 2D arrays of '
'identical shape (N,3)')
# now safe to call into C++
proj_obj = Projection(pos_start, pos_end)
proj_obj.makeProjection(self._cloud, reduction_mode)
return proj_obj.returnProjection()
[docs]
def traced_projection(self, pos_start: ArrayLike,
pos_end: ArrayLike,
return_midpoint: bool = True, *,
reduction: str = 'integrate',
flatten: bool = False) -> tuple:
"""Projection with per-segment detail for one or more rays.
Accepts a single ray (shape ``(3,)`` or ``(1, 3)``) or a batch
of rays (shape ``(N, 3)``).
Args:
pos_start: Ray start point(s). Shape ``(3,)``, ``(1, 3)``,
or ``(N, 3)``.
pos_end: Ray end point(s), same shape as *pos_start*.
return_midpoint: If *True*, return the midpoint of each
segment; otherwise return the entry distance.
reduction: ``'integrate'``/``'sum'``, ``'max'``,
``'min'``, or ``'volume'``.
flatten: If *True*, return flat arrays with an offset
index instead of per-ray lists. Only applies to
batch (N > 1) inputs.
Returns:
For a **single ray** (input shape ``(3,)`` or ``(1, 3)``):
``(value, cell_ids, s_vals, ds_vals)``
- *value*: scalar when ``nfields == 1``, else 1-D array.
- *cell_ids*, *s_vals*, *ds_vals*: 1-D arrays.
For a **batch** with ``flatten=False`` (default):
``(values, cell_ids_list, s_vals_list, ds_vals_list)``
- *values*: shape ``(N,)`` or ``(N, nfields)``.
- *cell_ids_list*, *s_vals_list*, *ds_vals_list*: lists of
*N* arrays, one per ray.
For a **batch** with ``flatten=True``:
``(values, cell_ids, s_vals, ds_vals, offsets)``
- Flat concatenated arrays for all segments plus an
*offsets* array of length ``N + 1``.
Ray *i*'s data is at ``offsets[i]:offsets[i+1]``.
"""
pos_start_np = np.asarray(pos_start)
pos_end_np = np.asarray(pos_end)
# Detect single-ray mode.
single = False
if pos_start_np.ndim == 1 and pos_end_np.ndim == 1:
if pos_start_np.shape != (3,) or pos_end_np.shape != (3,):
raise ValueError('If 1D, pos_start and pos_end must both '
'have shape (3,)')
pos_start_np = pos_start_np[np.newaxis, :]
pos_end_np = pos_end_np[np.newaxis, :]
single = True
elif pos_start_np.ndim == 2 and pos_end_np.ndim == 2:
if (pos_start_np.shape != pos_end_np.shape or
pos_start_np.shape[1] != 3):
raise ValueError('pos_start and pos_end must be 2D arrays '
'of identical shape (N, 3)')
if pos_start_np.shape[0] == 1:
single = True
else:
raise ValueError('pos_start and pos_end must both be 1D shape '
'(3,) or both 2D shape (N, 3)')
pos_start_c = self._prepare_array_for_backend(pos_start_np)
pos_end_c = self._prepare_array_for_backend(pos_end_np)
reduction_mode = self._validate_reduction(reduction)
proj_obj = Projection(pos_start_c, pos_end_c)
proj_obj.makeDetailedProjection(self._cloud, reduction_mode)
values = proj_obj.returnProjection()
cell_ids, s_enter, s_exit, offsets = proj_obj.returnSegments()
# Map internal cell IDs to original particle IDs.
cell_ids = self.orig_ids[cell_ids]
ds_vals = s_exit - s_enter
if return_midpoint:
s_vals = (s_enter + s_exit) / 2.0
else:
s_vals = s_enter
if single:
# Unpack single-ray result to match legacy format.
if (reduction_mode == ReductionMode.VolumeRender or
self._nfields > 1):
val_out = values[0]
else:
val_out = values[0] if values.ndim == 1 else values[0, 0]
return val_out, cell_ids, s_vals, ds_vals
if flatten:
return values, cell_ids, s_vals, ds_vals, offsets
# Split flat arrays into per-ray lists.
cell_ids_list = [cell_ids[offsets[i]:offsets[i + 1]]
for i in range(len(offsets) - 1)]
s_vals_list = [s_vals[offsets[i]:offsets[i + 1]]
for i in range(len(offsets) - 1)]
ds_vals_list = [ds_vals[offsets[i]:offsets[i + 1]]
for i in range(len(offsets) - 1)]
return values, cell_ids_list, s_vals_list, ds_vals_list
[docs]
def single_projection(self, *args, **kwargs):
"""Removed. Use :meth:`traced_projection` instead."""
raise AttributeError(
"'single_projection' has been removed. "
"Use 'traced_projection' instead."
)
[docs]
def slice(self, extent: ArrayLike,
nres: int | tuple[int, int],
depth: float) -> np.ndarray:
"""Extract a 2D slice at constant depth through the point cloud.
Unlike a projection (which integrates along the line of sight), a
slice returns the nearest-cell field value at each pixel position.
Args:
extent: Spatial extent ``[xmin, xmax, ymin, ymax]``.
nres: Number of pixels (int for square, or ``(nx, ny)``).
depth: The z-coordinate of the slicing plane.
Returns:
ndarray: Shape ``(nx, ny)`` for single field,
``(nx, ny, nfields)`` for multi-field.
"""
if isinstance(nres, int):
npix = (nres, nres)
else:
npix = tuple(nres)
extent_arr = np.asarray(extent, dtype=np.float64).ravel()
if extent_arr.size != 4:
raise ValueError(
"extent must have 4 elements [xmin, xmax, ymin, ymax]")
sl = Slice(list(npix), list(extent_arr), float(depth))
sl.makeSlice(self._cloud)
dat = sl.returnSlice()
if self._nfields == 1:
dat = dat.reshape(npix[0], npix[1])
else:
dat = dat.reshape(npix[0], npix[1], self._nfields)
return dat
# ------------------------------------------------------------------
# Tree serialization helpers (used by vortrace.io)
# ------------------------------------------------------------------
[docs]
def save_tree_bytes(self) -> bytes | None:
"""Serialize the kD-tree to bytes, or *None* if not built."""
if self._cloud.get_tree_built():
return self._cloud.saveTreeToBytes()
return None
[docs]
def load_tree_bytes(self, data: bytes) -> None:
"""Restore the kD-tree from a bytes object."""
self._cloud.loadTreeFromBytes(data)
# ------------------------------------------------------------------
# Convenience I/O wrappers
# ------------------------------------------------------------------
[docs]
def save(self, filename: str, *, fmt: str = "npz",
save_tree: bool = True) -> None:
"""Save this cloud to disk.
See :func:`vortrace.io.save_cloud` for details.
"""
from vortrace.io import save_cloud
save_cloud(filename, self, fmt=fmt, save_tree=save_tree)
[docs]
@classmethod
def load(cls, filename: str) -> ProjectionCloud:
"""Load a cloud from disk.
See :func:`vortrace.io.load_cloud` for details.
"""
from vortrace.io import load_cloud
return load_cloud(filename)