From 9fb198d02c1c1582e3a255cacf227ebc5a5a9a1e Mon Sep 17 00:00:00 2001 From: lmoresi Date: Wed, 29 Apr 2026 11:06:55 +1000 Subject: [PATCH 1/3] Replace read-everywhere checkpoint reads with swarm-routed reads MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit MeshVariable.read_timestep and SwarmVariable.read_timestep both used to have every rank h5py-open the saved file and build a rank-local KDTree over the *full* saved field. PR #146 measured 3.92 TB resident at 1152 ranks for a 1/128 spherical case driven by exactly this pattern. Both methods now use a transient swarm to route saved (coord, value) pairs to the rank that owns each location. Per-rank memory is bounded by file_size / n_ranks instead of file_size per rank. Two pieces: * Swarm._route_by_nearest_centroid() (new, swarm.py) — deterministic centroid-distance routing. Bypasses Swarm.migrate's points_in_domain test, which can return True on multiple ranks for vertex DOFs sitting on a partition boundary (owner + ghost). With this routing rule the destination is a pure function of the coordinate, so a saved point and a query at the same coord always land on the same rank. * MeshVariable.read_timestep — round-trip pattern with two transient swarms. Source swarm carries (coord, saved_value); rank 0 reads the file once and ships chunks via the centroid router. Query swarm carries each rank's live DOF coordinates, also routed by centroid. The interpolation runs rank-local against the landed source data, then results migrate back to the live DOF's home rank via the bare DMSwarm migrate idiom (rank-stamped, no validation). * SwarmVariable.read_timestep — same recipe, single direction (the live swarm's particles already have a known home; the source data is routed to where the live particles landed, then a rank-local KDTree fills in the values). The serial round-trip tests (tests/test_0003_save_load.py) pass exactly as before. New parallel test ptest_0762_read_timestep_swarm_routed.py exercises the MeshVariable parallel round-trip on 2 ranks; the matching SwarmVariable parallel test is skipped because Swarm.write_timestep itself hangs in parallel — a pre-existing issue unrelated to read. Underworld development team with AI support from Claude Code --- .../discretisation_mesh_variables.py | 185 ++++++++++++------ src/underworld3/swarm.py | 105 +++++++--- .../ptest_0762_read_timestep_swarm_routed.py | 100 ++++++++++ 3 files changed, 303 insertions(+), 87 deletions(-) create mode 100644 tests/parallel/ptest_0762_read_timestep_swarm_routed.py diff --git a/src/underworld3/discretisation/discretisation_mesh_variables.py b/src/underworld3/discretisation/discretisation_mesh_variables.py index 13359073..429f0793 100644 --- a/src/underworld3/discretisation/discretisation_mesh_variables.py +++ b/src/underworld3/discretisation/discretisation_mesh_variables.py @@ -70,6 +70,8 @@ def wrapper(final): return wrapper + + class _BaseMeshVariable(Stateful, uw_object): """ The MeshVariable class generates a variable supported by a finite element mesh and the @@ -1155,90 +1157,145 @@ def read_timestep( ): """ Read a mesh variable from an arbitrary vertex-based checkpoint file - and reconstruct/interpolate the data field accordingly. The data sizes / meshes can be - different and will be matched using a kd-tree / inverse-distance weighting - to the new mesh. + and reconstruct/interpolate the data field accordingly. The saved + mesh and the live mesh may have different sizes/decompositions; the + values are matched by nearest-neighbour kd-tree interpolation to + the live mesh nodes. - """ + Parallel-safe and memory-bounded. Two transient swarms route the + work without ever holding the full file on more than one rank: + + 1. **Source swarm** — rank 0 reads the file; saved + ``(coord, value)`` pairs migrate to whichever rank owns the + centroid-domain of each location. - # Fix this to match the write_timestep function + 2. **Query swarm** — each rank inserts *its own* live DOF + coordinates. They migrate using the same centroid logic, so + a live DOF and a saved point at the same coordinate land on + the same rank regardless of how PETSc partitioned the DM. + Each rank then runs a rank-local KDTree against the saved + data it received, and the interpolated values migrate back to + the live DOF's home rank. - # mesh.write_timestep( "test", meshUpdates=False, meshVars=[X], outputPath="", index=0) - # swarm.write_timestep("test", "swarm", swarmVars=[var], outputPath="", index=0) + Per-rank memory is bounded by ``file_size / n_ranks`` rather than + ``file_size`` per rank. + """ output_base_name = os.path.join(outputPath, data_filename) data_file = output_base_name + f".mesh.{data_name}.{index:05}.h5" - # check if data_file exists - if os.path.isfile(os.path.abspath(data_file)): - pass - else: + if not os.path.isfile(os.path.abspath(data_file)): raise RuntimeError(f"{os.path.abspath(data_file)} does not exist") import h5py import numpy as np - # Keep vector available for future access - pass - - ## Sub functions that are used to read / interpolate the mesh. - def field_from_checkpoint( - data_file=None, - data_name=None, - ): - """Read the mesh data as a swarm-like value""" + n_components = self.shape[1] + dim = self.mesh.dim + + # ---- Phase 1: source swarm carries saved (coord, value) pairs ---- + source_swarm = uw.swarm.Swarm(self.mesh) + saved = uw.swarm.SwarmVariable( + "_read_timestep_saved", + source_swarm, + vtype=uw.VarType.MATRIX, + size=(1, n_components), + dtype=float, + _proxy=False, + varsymbol=r"\cal{S}", + ) - if verbose and uw.mpi.rank == 0: + if uw.mpi.rank == 0: + if verbose: print(f"Reading data file {data_file}", flush=True) + with h5py.File(data_file, "r") as h5f: + X_src = h5f["fields"]["coordinates"][()].reshape(-1, dim) + D_src = h5f["fields"][data_name][()].reshape(-1, n_components) + else: + X_src = np.empty((0, dim), dtype=np.double) + D_src = np.empty((0, n_components), dtype=np.double) + + src_size_before = max(source_swarm.dm.getLocalSize(), 0) + source_swarm.add_particles_with_global_coordinates(X_src, migrate=False) + source_swarm._invalidate_canonical_data() + saved.array[src_size_before:, 0, :] = D_src[:, :] + # Deterministic centroid-distance routing: nearest rank-centroid + # owns the point. Both swarms (source + query below) use the same + # rule, so a saved point at coord X and a live-DOF query at the + # same X always land on the same rank — exact match restored. + # ``Swarm.migrate``'s ``points_in_domain`` test isn't enough on its + # own: at partition boundaries it can return True on multiple ranks + # (vertex DOFs are shared) and source/query end up apart. + source_swarm._route_by_nearest_centroid() + + landed_X = source_swarm._particle_coordinates.array[...].reshape(-1, dim) + landed_D = saved.array[:, 0, :] + + # ---- Phase 2: query swarm round-trips live DOFs to source rank ---- + query_coords = self.coords + if hasattr(query_coords, "magnitude"): + query_coords = query_coords.magnitude + n_query_local = query_coords.shape[0] + original_index = np.arange(n_query_local).reshape(-1, 1, 1) + + query_swarm = uw.swarm.Swarm(self.mesh) + origin_rank = uw.swarm.SwarmVariable( + "rank", query_swarm, + vtype=uw.VarType.SCALAR, dtype=int, _proxy=False, + varsymbol=r"\cal{R}_o", + ) + origin_index_var = uw.swarm.SwarmVariable( + "index", query_swarm, + vtype=uw.VarType.SCALAR, dtype=int, _proxy=False, + varsymbol=r"\cal{I}", + ) + result = uw.swarm.SwarmVariable( + "_read_timestep_result", query_swarm, + vtype=uw.VarType.MATRIX, size=(1, n_components), + dtype=float, _proxy=False, varsymbol=r"\cal{D}", + ) - h5f = h5py.File(data_file) - D = h5f["fields"][data_name][()].reshape(-1, self.shape[1]) - X = h5f["fields"]["coordinates"][()].reshape(-1, self.mesh.dim) - - h5f.close() - - if len(D.shape) == 1: - D = D.reshape(-1, 1) - - return X, D - - def map_to_vertex_values(X, D, nnn=4, p=2, verbose=False): - # Map from "swarm" of points to nodal points - # This is a permutation if we building on the checkpointed - # mesh file - - mesh_kdt = uw.kdtree.KDTree(X) - - # Strip pint units from query coords — the KDTree was built - # from plain HDF5 floats (same physical units, no metadata). - query_coords = self.coords - if hasattr(query_coords, "magnitude"): - query_coords = query_coords.magnitude - - return mesh_kdt.rbf_interpolator_local(query_coords, D, nnn, p, verbose) - - def values_to_mesh_var(mesh_variable, Values): - mesh = mesh_variable.mesh - - # This should be trivial but there may be problems if - # the kdtree does not have enough neighbours to allocate - # values for every point. We handle that here. - - mesh_variable.data[...] = Values[...] + q_size_before = max(query_swarm.dm.getLocalSize(), 0) + query_swarm.add_particles_with_global_coordinates(query_coords, migrate=False) + query_swarm._invalidate_canonical_data() + origin_rank.array[q_size_before:, 0, 0] = uw.mpi.rank + origin_index_var.array[q_size_before:, 0, 0] = original_index[:, 0, 0] - return + # Forward: live DOF coords go to the rank whose centroid is + # closest — same deterministic rule as the source swarm above. + query_swarm._route_by_nearest_centroid() - ## Read file information + local_query = query_swarm._particle_coordinates.array[...].reshape(-1, dim) - X, D = field_from_checkpoint( - data_file, - data_name, - ) + if landed_X.shape[0] > 0 and local_query.shape[0] > 0: + kdt = uw.kdtree.KDTree(landed_X) + # ``nnn=1`` — exact match for round-trip reads, sensible + # nearest-neighbour fallback for cross-mesh reads. + result.array[:, 0, :] = kdt.rbf_interpolator_local( + local_query, landed_D, 1, 2, verbose + ) + elif local_query.shape[0] > 0: + # No saved data landed on this rank — leave query payload zero + # and warn; callers can detect via a missing-rank pattern. + if verbose: + print( + f"[rank {uw.mpi.rank}] read_timestep: no saved points landed; " + f"queries from this rank will receive zeros", + flush=True, + ) - remapped_D = map_to_vertex_values(X, D) + # Reverse: stamp the destination rank from origin_rank and use the + # bare DMSwarm migrate to send each query particle back home. + query_swarm._rank_var.array[...] = origin_rank.array[...] + query_swarm.dm.migrate(remove_sent_points=True) + uw.mpi.barrier() + query_swarm._invalidate_canonical_data() - # This is empty at the moment - values_to_mesh_var(self, remapped_D) + # Reorder by original_index and write into self.data + idx = origin_index_var.array[:, 0, 0] + out = np.zeros((n_query_local, n_components), dtype=np.double) + out[idx, :] = result.array[:, 0, :] + self.data[...] = out return diff --git a/src/underworld3/swarm.py b/src/underworld3/swarm.py index 83fadb38..7e352ca8 100644 --- a/src/underworld3/swarm.py +++ b/src/underworld3/swarm.py @@ -1910,37 +1910,66 @@ def read_timestep( else: raise RuntimeError(f"{os.path.abspath(filename)} does not exist") - ### open up file with coords on all procs and open up data on all procs. May be problematic for large problems. - with ( - h5py.File(f"{filename}", "r") as h5f_data, - h5py.File(f"{swarmFilename}", "r") as h5f_swarm, - ): - - # with self.swarm.access(self): - var_dtype = self.dtype - file_dtype = h5f_data["data"][:].dtype - file_length = h5f_data["data"][:].shape[0] - - if var_dtype != file_dtype: - if comm.rank == 0: + # Memory-bounded parallel read: rank 0 streams coords + values from + # disk into a transient routing swarm; ``swarm.migrate`` ships each + # (coord, value) pair to the rank that owns its location; each rank + # then runs a small rank-local KDTree to fill the live swarm's + # particles. Per-rank memory scales as ``file_size / n_ranks``. + + n_components = self.num_components + dim = self.swarm.mesh.dim + + if uw.mpi.rank == 0: + with ( + h5py.File(f"{filename}", "r") as h5f_data, + h5py.File(f"{swarmFilename}", "r") as h5f_swarm, + ): + file_dtype = h5f_data["data"].dtype + if self.dtype != file_dtype: warnings.warn( - f"{os.path.basename(filename)} dtype ({file_dtype}) does not match {self.name} swarm variable dtype ({var_dtype}) which may result in a loss of data.", + f"{os.path.basename(filename)} dtype ({file_dtype}) " + f"does not match {self.name} swarm variable dtype " + f"({self.dtype}) which may result in a loss of data.", stacklevel=2, ) + X_chunk = h5f_swarm["coordinates"][()].reshape(-1, dim) + D_chunk = h5f_data["data"][()].reshape(-1, n_components) + else: + X_chunk = np.empty((0, dim), dtype=np.double) + D_chunk = np.empty((0, n_components), dtype=np.double) + + tmp_swarm = uw.swarm.Swarm(self.swarm.mesh) + saved = SwarmVariable( + "_read_timestep_saved", + tmp_swarm, + vtype=uw.VarType.MATRIX, + size=(1, n_components), + dtype=float, + _proxy=False, + varsymbol=r"\cal{S}", + ) - # First work out which are local points and ignore the rest - # This might help speed up the load by dropping lots of particles - - all_coords = h5f_swarm["coordinates"][()] - all_data = h5f_data["data"][()] + size_before = max(tmp_swarm.dm.getLocalSize(), 0) + tmp_swarm.add_particles_with_global_coordinates(X_chunk, migrate=False) + tmp_swarm._invalidate_canonical_data() + saved.array[size_before:, 0, :] = D_chunk[:, :] - local_coords = all_coords # [local] - local_data = all_data # [local] + # Deterministic centroid-distance routing — see Swarm._route_by_nearest_centroid. + tmp_swarm._route_by_nearest_centroid() - kdt = uw.kdtree.KDTree(local_coords) + landed_X = tmp_swarm._particle_coordinates.array[...].reshape(-1, dim) + landed_D = saved.array[:, 0, :] + if landed_X.shape[0] == 0: + warnings.warn( + f"[rank {uw.mpi.rank}] read_timestep: no saved swarm points " + f"landed locally; '{self.name}' on this rank will not be updated", + stacklevel=2, + ) + else: + kdt = uw.kdtree.KDTree(landed_X) self.array[:, 0, :] = kdt.rbf_interpolator_local( - self.swarm._particle_coordinates.data, local_data, nnn=1 + self.swarm._particle_coordinates.data, landed_D, nnn=1 ) return @@ -2557,6 +2586,36 @@ def _invalidate_canonical_data(self): if hasattr(var, "_canonical_data"): var._canonical_data = None + def _route_by_nearest_centroid(self): + """Migrate every particle to the rank whose domain-centroid is closest. + + This is a deterministic alternative to :meth:`migrate`: the destination + is a pure function of the coordinate, computed identically on every + rank. Two swarms migrated this way are guaranteed to place equal + coordinates on the same rank — which the standard ``migrate`` does not + guarantee at partition boundaries (vertex DOFs sitting on a shared face + can return ``True`` from ``points_in_domain`` on multiple ranks). + + Used by checkpoint readers and any consumer that needs source data and + query coordinates to converge on the same rank without relying on + PETSc's DOF distribution. + """ + centroids = self.mesh._get_domain_centroids() + centroid_kdt = uw.kdtree.KDTree(centroids) + + coords = self.dm.getField("DMSwarmPIC_coor").reshape(-1, self.dim).copy() + self.dm.restoreField("DMSwarmPIC_coor") + + if coords.shape[0] > 0: + _, owner_rank = centroid_kdt.query(coords, k=1, sqr_dists=False) + rank_arr = self.dm.getField("DMSwarm_rank") + rank_arr[:, 0] = owner_rank.astype(rank_arr.dtype, copy=False) + self.dm.restoreField("DMSwarm_rank") + + self.dm.migrate(remove_sent_points=True) + uw.mpi.barrier() + self._invalidate_canonical_data() + @property def mesh(self): """The mesh this swarm operates on""" diff --git a/tests/parallel/ptest_0762_read_timestep_swarm_routed.py b/tests/parallel/ptest_0762_read_timestep_swarm_routed.py new file mode 100644 index 00000000..9c5c4522 --- /dev/null +++ b/tests/parallel/ptest_0762_read_timestep_swarm_routed.py @@ -0,0 +1,100 @@ +"""Parallel regression test for the swarm-routed read_timestep path. + +The new ``MeshVariable.read_timestep`` uses a transient swarm to ship saved +``(coord, value)`` pairs to the rank that owns each location. This test +verifies the parallel round-trip: write a known field on N ranks, read it +back on N ranks, assert allclose. + +Run with: + mpirun -n 2 python -m pytest --with-mpi tests/parallel/ptest_0762_read_timestep_swarm_routed.py +""" + +import os +import tempfile + +import numpy as np +import pytest + +import underworld3 as uw +from underworld3.meshing import UnstructuredSimplexBox + +pytestmark = [pytest.mark.mpi(min_size=2), pytest.mark.timeout(120)] + + +@pytest.mark.mpi(min_size=2) +@pytest.mark.level_1 +@pytest.mark.tier_a +def test_meshvariable_read_timestep_parallel_round_trip(): + """Write field on N ranks, read back on same N ranks, expect allclose.""" + # Use the same tmpdir on every rank — rank 0 writes, all ranks read. + if uw.mpi.rank == 0: + tmpdir = tempfile.mkdtemp(prefix="uw3_read_timestep_") + else: + tmpdir = None + tmpdir = uw.mpi.comm.bcast(tmpdir, root=0) + + mesh = UnstructuredSimplexBox( + minCoords=(0.0, 0.0), + maxCoords=(1.0, 1.0), + cellSize=1.0 / 16.0, + ) + X = uw.discretisation.MeshVariable("X", mesh, 1, degree=2) + X2 = uw.discretisation.MeshVariable("X2", mesh, 1, degree=2) + + # Analytic field — has variation in both dims so a faulty + # rank-local read-everywhere implementation would still pass on + # constant fields. + X.array[:, 0, 0] = X.coords[:, 0] + 0.5 * X.coords[:, 1] + + mesh.write_timestep( + "test", meshUpdates=False, meshVars=[X], outputPath=tmpdir, index=0, + ) + + X2.read_timestep("test", "X", 0, outputPath=tmpdir) + + assert np.allclose(X.array, X2.array, atol=1e-8), ( + f"[rank {uw.mpi.rank}] read_timestep round-trip failed:\n" + f"max diff = {np.abs(np.asarray(X.array) - np.asarray(X2.array)).max():.3e}" + ) + + +@pytest.mark.skip( + reason="Pre-existing: Swarm.write_timestep hangs on N>1 with parallel h5py " + "(unrelated to read_timestep refactor). Serial round-trip is covered by " + "tests/test_0003_save_load.py::test_swarmvariable_save_and_load." +) +@pytest.mark.mpi(min_size=2) +@pytest.mark.level_1 +@pytest.mark.tier_a +def test_swarmvariable_read_timestep_parallel_round_trip(): + """Same round-trip check for SwarmVariable.read_timestep.""" + if uw.mpi.rank == 0: + tmpdir = tempfile.mkdtemp(prefix="uw3_read_timestep_swarm_") + else: + tmpdir = None + tmpdir = uw.mpi.comm.bcast(tmpdir, root=0) + + mesh = UnstructuredSimplexBox( + minCoords=(0.0, 0.0), + maxCoords=(1.0, 1.0), + cellSize=1.0 / 16.0, + ) + swarm = uw.swarm.Swarm(mesh) + var = swarm.add_variable(name="X", size=1) + var2 = swarm.add_variable(name="X2", size=1) + swarm.populate(fill_param=2) + + # Each particle stores its own x-coord + var.array[:, 0, 0] = swarm._particle_coordinates.data[:, 0] + + swarm.write_timestep( + "test", "swarm", swarmVars=[var], outputPath=tmpdir, index=0, + ) + + var2.read_timestep("test", "swarm", "X", 0, outputPath=tmpdir) + + # nnn=1 → exact match (nearest-neighbour) for the round-trip + assert np.allclose(var.array, var2.array, atol=1e-8), ( + f"[rank {uw.mpi.rank}] swarm read_timestep round-trip failed:\n" + f"max diff = {np.abs(var.array - var2.array).max():.3e}" + ) From 716d7830925780bd0b3d072201b82ba9b4564df1 Mon Sep 17 00:00:00 2001 From: lmoresi Date: Wed, 29 Apr 2026 13:51:35 +1000 Subject: [PATCH 2/3] Fix shape-mismatch in _route_by_nearest_centroid for older PETSc CI runs against conda-forge PETSc 3.21, where DMSwarm_rank is exposed as a 1-D NumPy array of shape (N,). Local development uses custom-built PETSc 3.25, which exposes the same field as 2-D (N, 1). The new helper assumed the 2-D form and indexed rank_arr[:, 0], which raises IndexError on the older PETSc. reshape(-1) flattens either form into a writable view into the same buffer, so the assignment works regardless of which PETSc version is in use. Existing call sites in Swarm.migrate use the same trick (or the matching 1-D form). Underworld development team with AI support from Claude Code --- src/underworld3/swarm.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/underworld3/swarm.py b/src/underworld3/swarm.py index 7e352ca8..f139e3dc 100644 --- a/src/underworld3/swarm.py +++ b/src/underworld3/swarm.py @@ -2609,7 +2609,10 @@ def _route_by_nearest_centroid(self): if coords.shape[0] > 0: _, owner_rank = centroid_kdt.query(coords, k=1, sqr_dists=False) rank_arr = self.dm.getField("DMSwarm_rank") - rank_arr[:, 0] = owner_rank.astype(rank_arr.dtype, copy=False) + # ``DMSwarm_rank`` shape varies by PETSc version: 1-D ``(N,)`` on + # 3.21, 2-D ``(N, 1)`` on 3.25+. ``reshape(-1)`` flattens either + # form into a writable view into the same buffer. + rank_arr.reshape(-1)[:] = owner_rank.astype(rank_arr.dtype, copy=False) self.dm.restoreField("DMSwarm_rank") self.dm.migrate(remove_sent_points=True) From ccb7f8992fdd307d58e5277a3981e7166d1b5355 Mon Sep 17 00:00:00 2001 From: lmoresi Date: Wed, 29 Apr 2026 14:04:41 +1000 Subject: [PATCH 3/3] Address review feedback on the swarm-routed read PR MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Six fixes from Copilot review of #155: * SwarmVariable.read_timestep now routes the source swarm with Swarm.migrate (the same rule the live swarm uses) instead of nearest-centroid. Centroid Voronoi partitioning does not in general match cell ownership (points_in_domain), so the previous code could place a saved (coord, value) pair on a different rank than the live particle at the same coordinate, leaving the value never looked up or written to the wrong target. * MeshVariable.read_timestep computes n_components from self.num_components rather than self.shape[1]. shape[1] silently drops components for TENSOR (returns dim instead of dim**2) and SYM_TENSOR. The HDF5 reshape and the routing SwarmVariable both used the wrong size for tensor variables. * Update the SwarmVariable.read_timestep docstring so it no longer claims rank 0 "streams" the file — it doesn't, it reads the whole coordinates and data arrays in one shot. The comment now flags chunked I/O as a follow-up if rank-0 RAM becomes the constraint. * Drop the redundant blank lines between extend_enum and _BaseMeshVariable in discretisation_mesh_variables.py. * Drop the unused `os` import from tests/parallel/ptest_0762_read_timestep_swarm_routed.py. * The parallel test now creates its tmp directory through a small helper that broadcasts the path from rank 0, and tears it down inside a try/finally with shutil.rmtree on rank 0 after a barrier, so CI runs no longer leak uw3_read_timestep_* directories. Underworld development team with AI support from Claude Code --- .../discretisation_mesh_variables.py | 8 +- src/underworld3/swarm.py | 23 ++- .../ptest_0762_read_timestep_swarm_routed.py | 131 ++++++++++-------- 3 files changed, 93 insertions(+), 69 deletions(-) diff --git a/src/underworld3/discretisation/discretisation_mesh_variables.py b/src/underworld3/discretisation/discretisation_mesh_variables.py index 429f0793..814eb2cc 100644 --- a/src/underworld3/discretisation/discretisation_mesh_variables.py +++ b/src/underworld3/discretisation/discretisation_mesh_variables.py @@ -70,8 +70,6 @@ def wrapper(final): return wrapper - - class _BaseMeshVariable(Stateful, uw_object): """ The MeshVariable class generates a variable supported by a finite element mesh and the @@ -1190,7 +1188,11 @@ def read_timestep( import h5py import numpy as np - n_components = self.shape[1] + # ``self.num_components`` is correct for SCALAR (1), VECTOR (dim), + # TENSOR (dim**2) and SYM_TENSOR (dim*(dim+1)/2). ``self.shape[1]`` + # would silently drop components for tensor types because shape is + # ``(N, dim, dim)`` and ``[1]`` returns just ``dim``. + n_components = self.num_components dim = self.mesh.dim # ---- Phase 1: source swarm carries saved (coord, value) pairs ---- diff --git a/src/underworld3/swarm.py b/src/underworld3/swarm.py index f139e3dc..fe080680 100644 --- a/src/underworld3/swarm.py +++ b/src/underworld3/swarm.py @@ -1910,11 +1910,16 @@ def read_timestep( else: raise RuntimeError(f"{os.path.abspath(filename)} does not exist") - # Memory-bounded parallel read: rank 0 streams coords + values from - # disk into a transient routing swarm; ``swarm.migrate`` ships each - # (coord, value) pair to the rank that owns its location; each rank - # then runs a small rank-local KDTree to fill the live swarm's - # particles. Per-rank memory scales as ``file_size / n_ranks``. + # Memory-bounded parallel read. Rank 0 reads the saved file in one + # shot (this is not chunked — the file is the smallest copy of the + # data and we hold it for the duration of one ``add_particles`` call; + # streaming hyperslabs is a follow-up if rank-0 RAM becomes the + # binding constraint). Saved (coord, value) pairs are then pushed + # into a transient routing swarm and migrated *with the same rule + # the live swarm uses* — ``points_in_domain`` plus the centroid + # fallback inside ``Swarm.migrate``. That co-locates a saved point + # and the corresponding live particle on the same rank, so the + # rank-local KDTree always sees the right neighbour. n_components = self.num_components dim = self.swarm.mesh.dim @@ -1954,8 +1959,12 @@ def read_timestep( tmp_swarm._invalidate_canonical_data() saved.array[size_before:, 0, :] = D_chunk[:, :] - # Deterministic centroid-distance routing — see Swarm._route_by_nearest_centroid. - tmp_swarm._route_by_nearest_centroid() + # Use the same migration rule as the live swarm so saved points and + # live particles at the same coordinate land on the same rank. + # ``delete_lost_points=False`` keeps points that ``points_in_domain`` + # rejects on every rank; they fall through to the centroid fallback + # inside ``Swarm.migrate`` and end up somewhere deterministic. + tmp_swarm.migrate(remove_sent_points=True, delete_lost_points=False) landed_X = tmp_swarm._particle_coordinates.array[...].reshape(-1, dim) landed_D = saved.array[:, 0, :] diff --git a/tests/parallel/ptest_0762_read_timestep_swarm_routed.py b/tests/parallel/ptest_0762_read_timestep_swarm_routed.py index 9c5c4522..fc5c1ae7 100644 --- a/tests/parallel/ptest_0762_read_timestep_swarm_routed.py +++ b/tests/parallel/ptest_0762_read_timestep_swarm_routed.py @@ -9,7 +9,7 @@ mpirun -n 2 python -m pytest --with-mpi tests/parallel/ptest_0762_read_timestep_swarm_routed.py """ -import os +import shutil import tempfile import numpy as np @@ -21,41 +21,55 @@ pytestmark = [pytest.mark.mpi(min_size=2), pytest.mark.timeout(120)] -@pytest.mark.mpi(min_size=2) -@pytest.mark.level_1 -@pytest.mark.tier_a -def test_meshvariable_read_timestep_parallel_round_trip(): - """Write field on N ranks, read back on same N ranks, expect allclose.""" - # Use the same tmpdir on every rank — rank 0 writes, all ranks read. +def _shared_tmpdir(prefix): + """Make rank 0 a tmp directory and broadcast the path to every rank.""" if uw.mpi.rank == 0: - tmpdir = tempfile.mkdtemp(prefix="uw3_read_timestep_") + tmpdir = tempfile.mkdtemp(prefix=prefix) else: tmpdir = None - tmpdir = uw.mpi.comm.bcast(tmpdir, root=0) - - mesh = UnstructuredSimplexBox( - minCoords=(0.0, 0.0), - maxCoords=(1.0, 1.0), - cellSize=1.0 / 16.0, - ) - X = uw.discretisation.MeshVariable("X", mesh, 1, degree=2) - X2 = uw.discretisation.MeshVariable("X2", mesh, 1, degree=2) + return uw.mpi.comm.bcast(tmpdir, root=0) - # Analytic field — has variation in both dims so a faulty - # rank-local read-everywhere implementation would still pass on - # constant fields. - X.array[:, 0, 0] = X.coords[:, 0] + 0.5 * X.coords[:, 1] - mesh.write_timestep( - "test", meshUpdates=False, meshVars=[X], outputPath=tmpdir, index=0, - ) +def _cleanup_shared_tmpdir(tmpdir): + """Drop the shared tmp directory once all ranks are done with it.""" + uw.mpi.comm.barrier() + if uw.mpi.rank == 0: + shutil.rmtree(tmpdir, ignore_errors=True) - X2.read_timestep("test", "X", 0, outputPath=tmpdir) - assert np.allclose(X.array, X2.array, atol=1e-8), ( - f"[rank {uw.mpi.rank}] read_timestep round-trip failed:\n" - f"max diff = {np.abs(np.asarray(X.array) - np.asarray(X2.array)).max():.3e}" - ) +@pytest.mark.mpi(min_size=2) +@pytest.mark.level_1 +@pytest.mark.tier_a +def test_meshvariable_read_timestep_parallel_round_trip(): + """Write field on N ranks, read back on same N ranks, expect allclose.""" + tmpdir = _shared_tmpdir("uw3_read_timestep_") + try: + mesh = UnstructuredSimplexBox( + minCoords=(0.0, 0.0), + maxCoords=(1.0, 1.0), + cellSize=1.0 / 16.0, + ) + X = uw.discretisation.MeshVariable("X", mesh, 1, degree=2) + X2 = uw.discretisation.MeshVariable("X2", mesh, 1, degree=2) + + # Analytic field — varies in both dims so a faulty rank-local + # read-everywhere implementation could not silently pass on a + # constant field. + X.array[:, 0, 0] = X.coords[:, 0] + 0.5 * X.coords[:, 1] + + mesh.write_timestep( + "test", meshUpdates=False, meshVars=[X], + outputPath=tmpdir, index=0, + ) + + X2.read_timestep("test", "X", 0, outputPath=tmpdir) + + assert np.allclose(X.array, X2.array, atol=1e-8), ( + f"[rank {uw.mpi.rank}] read_timestep round-trip failed:\n" + f"max diff = {np.abs(np.asarray(X.array) - np.asarray(X2.array)).max():.3e}" + ) + finally: + _cleanup_shared_tmpdir(tmpdir) @pytest.mark.skip( @@ -68,33 +82,32 @@ def test_meshvariable_read_timestep_parallel_round_trip(): @pytest.mark.tier_a def test_swarmvariable_read_timestep_parallel_round_trip(): """Same round-trip check for SwarmVariable.read_timestep.""" - if uw.mpi.rank == 0: - tmpdir = tempfile.mkdtemp(prefix="uw3_read_timestep_swarm_") - else: - tmpdir = None - tmpdir = uw.mpi.comm.bcast(tmpdir, root=0) - - mesh = UnstructuredSimplexBox( - minCoords=(0.0, 0.0), - maxCoords=(1.0, 1.0), - cellSize=1.0 / 16.0, - ) - swarm = uw.swarm.Swarm(mesh) - var = swarm.add_variable(name="X", size=1) - var2 = swarm.add_variable(name="X2", size=1) - swarm.populate(fill_param=2) - - # Each particle stores its own x-coord - var.array[:, 0, 0] = swarm._particle_coordinates.data[:, 0] - - swarm.write_timestep( - "test", "swarm", swarmVars=[var], outputPath=tmpdir, index=0, - ) - - var2.read_timestep("test", "swarm", "X", 0, outputPath=tmpdir) - - # nnn=1 → exact match (nearest-neighbour) for the round-trip - assert np.allclose(var.array, var2.array, atol=1e-8), ( - f"[rank {uw.mpi.rank}] swarm read_timestep round-trip failed:\n" - f"max diff = {np.abs(var.array - var2.array).max():.3e}" - ) + tmpdir = _shared_tmpdir("uw3_read_timestep_swarm_") + try: + mesh = UnstructuredSimplexBox( + minCoords=(0.0, 0.0), + maxCoords=(1.0, 1.0), + cellSize=1.0 / 16.0, + ) + swarm = uw.swarm.Swarm(mesh) + var = swarm.add_variable(name="X", size=1) + var2 = swarm.add_variable(name="X2", size=1) + swarm.populate(fill_param=2) + + # Each particle stores its own x-coord + var.array[:, 0, 0] = swarm._particle_coordinates.data[:, 0] + + swarm.write_timestep( + "test", "swarm", swarmVars=[var], + outputPath=tmpdir, index=0, + ) + + var2.read_timestep("test", "swarm", "X", 0, outputPath=tmpdir) + + # nnn=1 → exact match (nearest-neighbour) for the round-trip + assert np.allclose(var.array, var2.array, atol=1e-8), ( + f"[rank {uw.mpi.rank}] swarm read_timestep round-trip failed:\n" + f"max diff = {np.abs(np.asarray(var.array) - np.asarray(var2.array)).max():.3e}" + ) + finally: + _cleanup_shared_tmpdir(tmpdir)