Source code for flatiron_tk.physics.particle_tracking

import atexit
import dolfinx
import flatiron_tk
import numpy as np
import pyvista as pv
import os
import shutil

from flatiron_tk import PointEvaluator
from flatiron_tk import PVDWriter
from mpi4py import MPI

def _ensure_3D_velocity(velocity):
    """
    Convert 1D/2D/3D velocity into a length-3 numpy array.
    """
    velocity = np.array(velocity, dtype=np.float64).ravel()
    if velocity.size < 3:
        velocity = np.pad(velocity, (0, 3 - velocity.size), mode='constant')
    elif velocity.size > 3:
        raise ValueError('Velocity evaluation vector must be 1D, 2D, or 3D.')
    return velocity

[docs] class MasslessTracerTracker: """ Track massless particles and write per-step .vtp files plus a .pvd collection (finalized automatically via atexit). Works under MPI: only rank 0 writes the .pvd, while all ranks may write .vtp files if desired. Parameters ---------- mesh : Mesh The flatirion_tk mesh object. dt : float Time step size for particle advection. """ def __init__(self, mesh, dt): self.mesh = mesh self.dt = float(dt) self.V = dolfinx.fem.functionspace(self.mesh.msh, ('CG', 1)) self.evaluator = PointEvaluator(mesh) # Particle positions are nx3 numpy array self.particle_positions = np.zeros((0, 3), dtype=np.float64) self.step = 0 self._finalized = False self.comm = MPI.COMM_WORLD self.rank = self.comm.Get_rank() self.size = self.comm.Get_size() # Register automatic finalizer (safe to register multiple times) atexit.register(self._finalize_if_needed)
[docs] def set_writer(self, output_dir): """ Prepare output folder and PVD writer. Parameters ---------- output_dir : str Directory to write per-step files and particles.pvd. """ self.file_format = 'pvd' self.output_dir = os.path.abspath(output_dir) # Only rank 0 should create/remove the directory, then synchronize if self.rank == 0: if os.path.exists(self.output_dir): shutil.rmtree(self.output_dir) os.makedirs(self.output_dir, exist_ok=True) # wait until output_dir exists on shared FS self.comm.Barrier() # Setup PVD only on rank 0 if self.rank == 0: pvd_path = os.path.join(self.output_dir, 'particles.pvd') self.pvd_writer = PVDWriter(pvd_path)
[docs] def write(self, time_stamp=None): """ Write current particle positions to a .vtp file and add to a .pvd collection. Parameters ---------- time_stamp : float Time stamp to associate with this write in the .pvd file. """ positions = np.asarray(self.particle_positions, dtype=np.float64) positions_file = os.path.join(self.output_dir, f'positions_step_p{self.rank:04d}_{self.step:08d}.vtp') # Ensure nx3 if positions.ndim == 1: positions = positions.reshape(1, -1) if positions.shape[1] == 2: positions = np.hstack((positions, np.zeros((positions.shape[0], 1), dtype=positions.dtype))) points = pv.PolyData(positions) if positions.shape[0] > 0: points["Particle ID"] = np.arange(positions.shape[0], dtype=np.int64) # Save file try: points.save(positions_file) except Exception as e: if self.rank == 0: flatiron_tk.custom_warning_message(f'Failed to save VTP {positions_file}: {e}') # Only rank 0 adds the dataset entry to the PVD if self.rank == 0 and self.pvd_writer is not None: timestep_value = time_stamp if time_stamp is not None else self.step for r in range(self.size): file_for_rank = f"positions_step_p{r:04d}_{self.step:08d}.vtp" self.pvd_writer.add_dataset(timestep=timestep_value, file=file_for_rank) self.step += 1 self.comm.Barrier() self.step += 1 # ensure PVD updates are visible after write (rank 0 only) self.comm.Barrier()
[docs] def set_particle_positions(self, particle_positions): """ Set the particle positions directly. Parameters ---------- particle_positions : Sequence Iterable of particle positions, each a 1D array-like of length 2 or 3. """ arr = np.array(particle_positions, dtype=np.float64) if arr.ndim == 1: arr = arr.reshape(1, -1) if arr.shape[1] == 2: arr = np.hstack((arr, np.zeros((arr.shape[0], 1), dtype=arr.dtype))) self.particle_positions = arr
[docs] def set_particle_positions_from_boundary(self, boundary_id): """ Set the particle positions to all DOF coordinates on a given boundary. Parameters ---------- boundary_id : int The boundary marker ID from which to get the coordinates. """ self.particle_positions = self.get_coordinates_on_boundary(boundary_id)
[docs] def get_coordinates_on_boundary(self, boundary_id): """ Get the coordinates of all DOFs on a given boundary across all ranks. Parameters ---------- boundary_id : int The boundary marker ID from which to get the coordinates. Returns ------- coords : np.ndarray Nx3 array of coordinates on the given boundary across all ranks. """ dofs_on_boundary = dolfinx.fem.locate_dofs_topological(self.V, self.mesh.get_fdim(), self.mesh.boundary.find(boundary_id)) dof_coords = self.V.tabulate_dof_coordinates() coords_on_boundary = dof_coords[dofs_on_boundary, :] # Make nx3 if coords_on_boundary.shape[1] == 2: coords_on_boundary = np.hstack( (coords_on_boundary, np.zeros((coords_on_boundary.shape[0], 1))) ) coords_on_boundary = coords_on_boundary.astype(np.float64) # Gather all coordinates all_coords = self.comm.allgather(coords_on_boundary) all_coords = np.vstack(all_coords) if any(len(c) > 0 for c in all_coords) else np.zeros((0, 3), dtype=np.float64) if all_coords.shape[0] == 0: raise ValueError(f'No coordinates found on boundary with marker {boundary_id} across all ranks.') return all_coords
[docs] def update_particle_positions(self, current_velocity, previous_velocity=None, method='euler'): """ Update particle positions based on the current and previous velocity fields. Parameters ---------- current_velocity : dolfinx.fem.Function The current velocity field. previous_velocity : dolfinx.fem.Function The previous velocity field (required for Heun's method). method : str The time integration method to use ('euler' or 'heun'). """ un = current_velocity u0 = previous_velocity if method == "euler": _, vel_n = self.evaluator.evaluate_set(un, self.particle_positions) for pos, vel in zip(self.particle_positions, vel_n): if vel is not None: vel = _ensure_3D_velocity(vel) pos += vel * self.dt elif method == "heun": if u0 is None: raise ValueError('previous velocity must be provided for Heun\'s method.') _, vel_0 = self.evaluator.evaluate_set(u0, self.particle_positions) for pos, vel0 in zip(self.particle_positions, vel_0): if vel0 is not None: vel0 = _ensure_3D_velocity(vel0) predictor_pos = pos + vel0 * self.dt vel_pred = self.evaluator.evaluate_point(un, predictor_pos, show_warning=False) if vel_pred is None: vel_pred = np.zeros(3) else: vel_pred = _ensure_3D_velocity(vel_pred) pos += 0.5 * (vel0 + vel_pred) * self.dt
[docs] def inject_particles(self, new_particle_positions): """ Inject new particles at specified positions. Parameters ---------- new_particle_positions : Sequence Iterable of new particle positions, each a 1D array-like of length 2 or 3. """ arr = np.array(new_particle_positions, dtype=np.float64) if arr.ndim == 1: arr = arr.reshape(1, -1) if arr.shape[1] == 2: arr = np.hstack((arr, np.zeros((arr.shape[0], 1), dtype=arr.dtype))) if self.particle_positions.size == 0: self.particle_positions = arr else: self.particle_positions = np.vstack((self.particle_positions, arr))
[docs] def inject_particles_from_boundary(self, boundary_id): boundary_coords = self.get_coordinates_on_boundary(boundary_id) self.inject_particles(boundary_coords)
def _finalize_if_needed(self): """ Automatically called at program exit to write the .pvd file on rank 0. """ try: if getattr(self, "_finalized", False): return if not hasattr(self, 'file_format') or self.file_format not in ('vtp', 'pvd'): # nothing to do self._finalized = True return # only rank 0 writes the pvd if self.rank == 0 and getattr(self, 'pvd_writer', None) is not None: try: self.pvd_writer.write() except Exception as e: flatiron_tk.custom_warning_message(f'Failed to finalize particle PVD file: {e}') # mark finalized on all ranks self._finalized = True except Exception: # Never fail on exit hooks pass