From 135c06034715be2464a2464fba9d98f37b08165f Mon Sep 17 00:00:00 2001 From: Dan Vanderkam Date: Mon, 22 Jun 2026 12:26:40 -0400 Subject: [PATCH 1/2] Simplify and generalize Helmert logic --- ohmg/georeference/georeferencer.py | 165 ++++++++++------------------- tests/test_warp.py | 17 ++- 2 files changed, 64 insertions(+), 118 deletions(-) diff --git a/ohmg/georeference/georeferencer.py b/ohmg/georeference/georeferencer.py index ebe68470..d621bb98 100644 --- a/ohmg/georeference/georeferencer.py +++ b/ohmg/georeference/georeferencer.py @@ -3,20 +3,33 @@ import os import sys import time +from dataclasses import dataclass from io import StringIO from pathlib import Path -from typing import List, Tuple, Union +from typing import List from uuid import uuid4 +import numpy as np from django.conf import settings from osgeo import gdal, ogr, osr from ohmg.core.utils.srs import retrieve_srs_wkt -from .geometry import azimuth_from_coords - logger = logging.getLogger(__name__) + +@dataclass +class HelmertParams: + """The four parameters of a Helmert (similarity) transformation that maps + image pixel/line coordinates to geographic coordinates.""" + + scale: float + # rotation in degrees, measured as an azimuth from north (the positive Y axis) + rotation: float + # geographic coordinates that the image origin (pixel=0, line=0) maps to + offset_x: float + offset_y: float + gdal.SetConfigOption("GDAL_NUM_THREADS", "ALL_CPUS") TRANSFORMATION_LOOKUP = { @@ -244,106 +257,48 @@ def _load_gcps_from_geojson(self, geo_json): ) self.gcps.append(gcp) - def _calculate_scale(self) -> float: - """Compares two GCPs and returns a scale factor.""" + def _get_helmert_params(self) -> HelmertParams: + """Fit a Helmert (four-parameter similarity) transformation to the GCPs + using least squares, returning the scale, rotation, and offsets. - if len(self.gcps) < 2: - raise Exception("At least GCPs are needed to calculate a scale factor") + The transformation maps image pixel/line coordinates to geographic + coordinates as: - gcp1, gcp2 = self.gcps[0], self.gcps[1] - - # distance between the geographic coords in each GCP - # this distance is absolute so the order of the coords doesn't matter - geo_dist = math.dist( - (gcp1.GCPX, gcp1.GCPY), - (gcp2.GCPX, gcp2.GCPY), - ) - - # distance between the pixel coords in each GCP - # this distance is absolute so the order of the coords doesn't matter - img_dist = math.dist( - (gcp1.GCPPixel, gcp1.GCPLine), - (gcp2.GCPPixel, gcp2.GCPLine), - ) + geoX = offset_x + a * line - b * pixel + geoY = offset_y + a * pixel + b * line - return geo_dist / img_dist - - def _calculate_rotation(self, img_height: Union[float, None] = None) -> float: - """Compares two GCPs and calculates the difference in the angles - between the geometric points and the pixel points. - - Returns the angle in degrees from 'north', i.e. the positive Y axis""" + where (a, b) = (scale * cos(angle), scale * sin(angle)). Because this is + linear in the four unknowns (a, b, offset_x, offset_y), it can be solved + directly from two equations per GCP -- exactly determined with two GCPs, + and a best fit with more. This handles every orientation without any + special-casing. + """ if len(self.gcps) < 2: - raise Exception("At least two GCPs are needed to calculate a scale factor") - - # sort the GCPs so they are ordered from lowest to highest, - # then right to left, as they appear on the source image. - # this allows us to figure out the orientation - self.gcps.sort(key=lambda x: x.GCPPixel) - self.gcps.sort(key=lambda x: x.GCPLine, reverse=True) - - # make sure img height is set because it is needed to properly - # handle the inverted Y coords. - if not img_height: - ds = gdal.Open(self.gcps_vrt.get_vsi_url()) - img_height = ds.RasterYSize - img_coords = [(i.GCPPixel, img_height - i.GCPLine) for i in self.gcps] - img_azimuth = azimuth_from_coords(img_coords) - - geo_coords = [(i.GCPX, i.GCPY) for i in self.gcps] - geo_azimuth = azimuth_from_coords(geo_coords) - - difference = geo_azimuth - img_azimuth - return difference - - def _calculate_helmert_offsets(self, scale: float, rotation: float) -> Tuple[float, float]: - """Calculates the x and y offsets needed for helmert transformation, based - on GCP - """ + raise Exception("At least two GCPs are needed to fit a Helmert transformation") + + # build two rows (the geoX and geoY equations) per GCP + matrix, targets = [], [] + for gcp in self.gcps: + matrix.append([gcp.GCPLine, -gcp.GCPPixel, 1, 0]) + targets.append(gcp.GCPX) + matrix.append([gcp.GCPPixel, gcp.GCPLine, 0, 1]) + targets.append(gcp.GCPY) + + (a, b, offset_x, offset_y), *_ = np.linalg.lstsq( + np.array(matrix, dtype=float), np.array(targets, dtype=float), rcond=None + ) - ## get the gcp that is closest to the top of the page - use_gcp = min(self.gcps, key=lambda x: x.GCPPixel) - - ## use the scale factor to calculate the real distance to page extent - dist_to_page_edge = use_gcp.GCPLine * scale - dist_to_page_top = use_gcp.GCPPixel * scale - - ## rotation is azimuth, i.e. degrees from positive y-axis, - ## must adjust to be degrees from positive x-axis - theta1 = 90 - rotation - ## further adjustments to normalize - if theta1 < 0: - theta1 += 180 - if theta1 > 180: - theta1 -= 180 - - ## calculate the target angle q, the angle from the GCP coord - ## to the top left corner of the image, relative to negative - ## x-axis - r_degrees = 180 - (90 + theta1) - r_radians = math.radians(r_degrees) - s_radians = math.atan(dist_to_page_edge / dist_to_page_top) - q_radians = r_radians + s_radians - - ## get the real distance between the GCP coord and the page corner - dist_to_page_corner = math.sqrt(dist_to_page_edge**2 + dist_to_page_top**2) - - ## calculate the offset distances using q angle and distance from - ## GCP to page corner as hypotenuse - x_offset = math.cos(q_radians) * dist_to_page_corner - y_offset = math.sin(q_radians) * dist_to_page_corner - - ## adjust the GCP's coords by adding/subtracting the offsets based - ## on whether the top of the page is above or below the x-axis - if abs(rotation) > 90: - dx = use_gcp.GCPX + x_offset - dy = use_gcp.GCPY - y_offset - else: - dx = use_gcp.GCPX - x_offset - dy = use_gcp.GCPY + y_offset + scale = math.hypot(a, b) + # convert the fitted angle to an azimuth in degrees from north + rotation = (270 - math.degrees(math.atan2(b, a))) % 360 - return (dx, dy) + return HelmertParams( + scale=scale, + rotation=rotation, + offset_x=offset_x, + offset_y=offset_y, + ) def cleanup_files(self): for vrt in [ @@ -400,22 +355,18 @@ def make_warped_vrt( self.make_gcps_vrt(src_path, out_name) self.warped_vrt = VRTHandler(out_name, as_variant="modified") - if len(self.gcps) == 2 and self.transformation["id"] == "helmert": - ## get scale factor - scale = self._calculate_scale() - - ## get rotation - rotation = self._calculate_rotation() - ## adjust and convert to arcseconds - arcseconds = (rotation + 90) * 3600 + if self.transformation["id"] == "helmert": + ## fit the four-parameter Helmert model in one shot + params = self._get_helmert_params() - ## get the x y offsets - dx, dy = self._calculate_helmert_offsets(scale, rotation) + ## convert the rotation to arcseconds for the proj pipeline + arcseconds = (params.rotation + 90) * 3600 pipeline = ( "+proj=pipeline " "+step +proj=axisswap +order=2,1 " - f"+step +proj=helmert +x={dx} +y={dy} +theta={arcseconds} +s={scale}" + f"+step +proj=helmert +x={params.offset_x} +y={params.offset_y} " + f"+theta={arcseconds} +s={params.scale}" ) logger.debug(f"applying: {pipeline}") diff --git a/tests/test_warp.py b/tests/test_warp.py index 16348888..f2403635 100644 --- a/tests/test_warp.py +++ b/tests/test_warp.py @@ -10,15 +10,13 @@ @tag("warp") class HelmertTransformationTestCase(OHMGTestCase): + def test_helmert_transformation_calculations(self): """This test runs through 8 permutations of GCPs, and makes sure that the calculations used to set up the helmert transformations return the right values for every permutation. """ - # assume an image with these dimensions - img_width, img_height = 5, 8 # noqa: F841 - # inner (smallest) angle for a 3,4,5 triangle theta_345 = math.degrees(math.asin(3 / 5)) @@ -49,11 +47,8 @@ def test_helmert_transformation_calculations(self): g = Georeferencer(crs="EPSG:3857", transformation="helmert", gcps_gdal=[gcp1, gcp2]) - scale = g._calculate_scale() - self.assertEqual(scale, 0.5) - rotation = g._calculate_rotation(img_height=img_height) - self.assertEqual(rotation, target_rotation) - - dx, dy = g._calculate_helmert_offsets(scale=scale, rotation=rotation) - self.assertAlmostEqual(dx, x_offset) - self.assertAlmostEqual(dy, y_offset) + params = g._get_helmert_params() + self.assertAlmostEqual(params.scale, 0.5) + self.assertAlmostEqual(params.rotation, target_rotation) + self.assertAlmostEqual(params.offset_x, x_offset) + self.assertAlmostEqual(params.offset_y, y_offset) From 683faadc4e7842804bf6c47dad6c48696bff5fe8 Mon Sep 17 00:00:00 2001 From: Dan Vanderkam Date: Tue, 23 Jun 2026 12:47:25 -0400 Subject: [PATCH 2/2] remove unused azimuth_from_coords function --- ohmg/georeference/geometry.py | 55 +---------------------------------- 1 file changed, 1 insertion(+), 54 deletions(-) diff --git a/ohmg/georeference/geometry.py b/ohmg/georeference/geometry.py index 9c7f96dc..9b1cc3de 100644 --- a/ohmg/georeference/geometry.py +++ b/ohmg/georeference/geometry.py @@ -1,62 +1,9 @@ import math -from typing import List, Tuple +from typing import Tuple -import numpy as np from django.contrib.gis.geos import LineString -def azimuth_from_coords(coords: List[Tuple[float, float]]) -> float: - """Calculate the least-squares slope of the input coordinates, return - as degrees relative to the x axis.""" - - x_coords = [i[0] for i in coords] - y_coords = [i[1] for i in coords] - x_diff = x_coords[-1] - x_coords[0] - y_diff = y_coords[-1] - y_coords[0] - - # handle cases where the fit line would be horizontal or vertical, - # as well as an issue where passing all 0s to np.polyfit (e.g. all - # of the x values are 0, even though there are different y values) - # raises an exception - if len(set(x_coords)) == 1: - if y_diff > 0: - azimuth = 0 - else: - azimuth = 180 - elif len(set(y_coords)) == 1: - if x_diff > 0: - azimuth = 90 - else: - azimuth = 270 - # now handle all other cases by calculating the slope - else: - slope, intercept = np.polyfit(x_coords, y_coords, 1) - - # this is the angle from 0 axis - angle = math.degrees(math.atan(slope)) - - # now convert the angle to degrees from north by comparing - # the first and last set of coords to determine the general - # orientation of the fit line - x_diff = coords[-1][0] - coords[0][0] - y_diff = coords[-1][1] - coords[0][1] - - # orientation: ne - if x_diff > 0 and y_diff > 0: - azimuth = 90 - angle - # orientation: se - elif x_diff > 0 and y_diff < 0: - azimuth = 90 + abs(angle) - # orientation: sw - elif x_diff < 0 and y_diff < 0: - azimuth = 270 - angle - # orientation: nw - elif x_diff < 0 and y_diff > 0: - azimuth = 270 + abs(angle) - - return azimuth - - def extend_vector( p1: Tuple[float, float], p2: Tuple[float, float],