From 3cd993ca369ca9c0936df3510e80e7bf53cfca6f Mon Sep 17 00:00:00 2001 From: Caglar Pir Date: Thu, 5 Mar 2026 01:17:47 -0800 Subject: [PATCH] Add unit tests to increase coverage from 65% to 68% Add 103 new unit tests across 6 test files targeting modules with the lowest coverage: - test_gpx_serializer.py: GPX serialization (32% -> 99%) - test_http.py: HTTP utilities, truncation, sanitization (23% -> 55%) - test_api_v4.py: API client, auth error handling (33% -> 70%) - test_ipc.py: IPC write/send (0% -> 100%) - test_history.py: Upload history read/write (59% -> 81%) - test_gpmf_gps_filter.py: GPS filtering and outlier removal (22% -> 97%) --- tests/unit/test_api_v4.py | 116 +++++++++++++++ tests/unit/test_gpmf_gps_filter.py | 229 +++++++++++++++++++++++++++++ tests/unit/test_gpx_serializer.py | 216 +++++++++++++++++++++++++++ tests/unit/test_history.py | 150 +++++++++++++++++++ tests/unit/test_http.py | 142 ++++++++++++++++++ tests/unit/test_ipc.py | 49 ++++++ 6 files changed, 902 insertions(+) create mode 100644 tests/unit/test_api_v4.py create mode 100644 tests/unit/test_gpmf_gps_filter.py create mode 100644 tests/unit/test_gpx_serializer.py create mode 100644 tests/unit/test_history.py create mode 100644 tests/unit/test_http.py create mode 100644 tests/unit/test_ipc.py diff --git a/tests/unit/test_api_v4.py b/tests/unit/test_api_v4.py new file mode 100644 index 00000000..c0e25982 --- /dev/null +++ b/tests/unit/test_api_v4.py @@ -0,0 +1,116 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# +# This source code is licensed under the BSD license found in the +# LICENSE file in the root directory of this source tree. + +from unittest.mock import MagicMock + +import pytest +import requests + +from mapillary_tools import api_v4 + + +class TestCreateSessions: + def test_create_user_session_sets_oauth_header(self): + session = api_v4.create_user_session("test_token_123") + assert session.headers["Authorization"] == "OAuth test_token_123" + + def test_create_client_session_sets_oauth_header(self): + session = api_v4.create_client_session() + assert session.headers["Authorization"].startswith("OAuth ") + + +class TestIsAuthError: + def _make_response(self, status_code: int, json_data=None): + resp = MagicMock(spec=requests.Response) + resp.status_code = status_code + if json_data is not None: + resp.json.return_value = json_data + else: + resp.json.side_effect = Exception("no json") + return resp + + def test_401_is_auth_error(self): + resp = self._make_response(401) + assert api_v4.is_auth_error(resp) is True + + def test_403_is_auth_error(self): + resp = self._make_response(403) + assert api_v4.is_auth_error(resp) is True + + def test_400_with_not_authorized_type(self): + resp = self._make_response( + 400, + json_data={"debug_info": {"type": "NotAuthorizedError"}}, + ) + assert api_v4.is_auth_error(resp) is True + + def test_400_without_auth_type(self): + resp = self._make_response( + 400, + json_data={"debug_info": {"type": "SomeOtherError"}}, + ) + assert api_v4.is_auth_error(resp) is False + + def test_400_no_json(self): + resp = self._make_response(400) + assert api_v4.is_auth_error(resp) is False + + def test_200_is_not_auth_error(self): + resp = self._make_response(200) + assert api_v4.is_auth_error(resp) is False + + def test_500_is_not_auth_error(self): + resp = self._make_response(500) + assert api_v4.is_auth_error(resp) is False + + +class TestExtractAuthErrorMessage: + def _make_auth_response(self, status_code: int, json_data=None, text: str = ""): + resp = MagicMock(spec=requests.Response) + resp.status_code = status_code + resp.text = text + if json_data is not None: + resp.json.return_value = json_data + else: + resp.json.side_effect = Exception("no json") + return resp + + def test_graph_api_error_message(self): + resp = self._make_auth_response( + 401, + json_data={"error": {"message": "Invalid token"}}, + ) + assert api_v4.extract_auth_error_message(resp) == "Invalid token" + + def test_upload_service_error_message(self): + resp = self._make_auth_response( + 403, + json_data={"debug_info": {"message": "Forbidden access"}}, + ) + assert api_v4.extract_auth_error_message(resp) == "Forbidden access" + + def test_fallback_to_text(self): + resp = self._make_auth_response( + 401, + json_data={}, + text="Unauthorized", + ) + assert api_v4.extract_auth_error_message(resp) == "Unauthorized" + + def test_no_json_fallback(self): + resp = self._make_auth_response( + 401, + text="Auth failed", + ) + assert api_v4.extract_auth_error_message(resp) == "Auth failed" + + +class TestJsonifyResponse: + def test_invalid_json_raises_http_content_error(self): + resp = MagicMock(spec=requests.Response) + resp.json.side_effect = requests.JSONDecodeError("err", "", 0) + with pytest.raises(api_v4.HTTPContentError) as exc_info: + api_v4.jsonify_response(resp) + assert exc_info.value.response is resp diff --git a/tests/unit/test_gpmf_gps_filter.py b/tests/unit/test_gpmf_gps_filter.py new file mode 100644 index 00000000..c86b5c76 --- /dev/null +++ b/tests/unit/test_gpmf_gps_filter.py @@ -0,0 +1,229 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# +# This source code is licensed under the BSD license found in the +# LICENSE file in the root directory of this source tree. + +from __future__ import annotations + +import statistics + +import pytest + +from mapillary_tools.geo import Point +from mapillary_tools.gpmf import gps_filter +from mapillary_tools.gpmf.gpmf_gps_filter import remove_noisy_points, remove_outliers +from mapillary_tools.telemetry import GPSFix, GPSPoint + + +def _make_point(time: float, lat: float, lon: float) -> Point: + return Point(time=time, lat=lat, lon=lon, alt=None, angle=None) + + +def _make_gps_point( + time: float, + lat: float, + lon: float, + fix: GPSFix | None = GPSFix.FIX_3D, + precision: float | None = 100, + ground_speed: float | None = 5.0, +) -> GPSPoint: + return GPSPoint( + time=time, + lat=lat, + lon=lon, + alt=None, + angle=None, + epoch_time=None, + fix=fix, + precision=precision, + ground_speed=ground_speed, + ) + + +# --- Tests for gps_filter module --- + + +class TestCalculatePointSpeed: + def test_same_point_zero_time(self): + p = _make_point(0.0, 48.0, 11.0) + speed = gps_filter.calculate_point_speed(p, p) + assert speed == float("inf") + + def test_same_point_different_time(self): + p1 = _make_point(0.0, 48.0, 11.0) + p2 = _make_point(10.0, 48.0, 11.0) + speed = gps_filter.calculate_point_speed(p1, p2) + assert speed == 0.0 + + def test_speed_calculation(self): + p1 = _make_point(0.0, 0.0, 0.0) + p2 = _make_point(10.0, 0.001, 0.0) # ~111 meters + speed = gps_filter.calculate_point_speed(p1, p2) + assert 10 < speed < 12 # ~11.1 m/s + + +class TestSplitIf: + def test_empty_list(self): + assert gps_filter.split_if([], lambda a, b: True) == [] + + def test_single_point(self): + p = _make_point(0.0, 0.0, 0.0) + result = gps_filter.split_if([p], lambda a, b: True) + assert len(result) == 1 + assert result[0] == [p] + + def test_no_splits(self): + points = [_make_point(float(i), 0.0, 0.0) for i in range(5)] + result = gps_filter.split_if(points, lambda a, b: False) + assert len(result) == 1 + assert len(result[0]) == 5 + + def test_split_every_point(self): + points = [_make_point(float(i), 0.0, 0.0) for i in range(5)] + result = gps_filter.split_if(points, lambda a, b: True) + assert len(result) == 5 + for seq in result: + assert len(seq) == 1 + + +class TestDistanceGt: + def test_close_points_not_split(self): + decider = gps_filter.distance_gt(100000) + p1 = _make_point(0.0, 48.0, 11.0) + p2 = _make_point(1.0, 48.001, 11.001) + assert decider(p1, p2) is False + + def test_far_points_split(self): + decider = gps_filter.distance_gt(100) + p1 = _make_point(0.0, 0.0, 0.0) + p2 = _make_point(1.0, 1.0, 1.0) + assert decider(p1, p2) is True + + +class TestSpeedLe: + def test_slow_speed_true(self): + decider = gps_filter.speed_le(1000) + p1 = _make_point(0.0, 48.0, 11.0) + p2 = _make_point(10.0, 48.001, 11.001) + assert decider(p1, p2) is True + + def test_fast_speed_false(self): + decider = gps_filter.speed_le(0.001) + p1 = _make_point(0.0, 0.0, 0.0) + p2 = _make_point(1.0, 1.0, 1.0) + assert decider(p1, p2) is False + + +class TestUpperWhiskerEdge: + def test_raises_on_single_value(self): + with pytest.raises(statistics.StatisticsError): + gps_filter.upper_whisker([1]) + + def test_even_length(self): + # [1, 2, 3, 4] -> q1=1.5, q3=3.5, irq=2, upper=3.5+3=6.5 + assert gps_filter.upper_whisker([1, 2, 3, 4]) == 6.5 + + def test_odd_length(self): + # [1, 2, 3, 4, 5] -> q1=median([1,2])=1.5, q3=median([4,5])=4.5, irq=3, upper=4.5+4.5=9.0 + assert gps_filter.upper_whisker([1, 2, 3, 4, 5]) == 9.0 + + +# --- Tests for gpmf_gps_filter module --- + + +class TestRemoveNoisyPoints: + def test_empty_sequence(self): + result = remove_noisy_points([]) + assert list(result) == [] + + def test_all_good_points(self): + points = [ + _make_gps_point( + float(i), 48.0 + i * 0.0001, 11.0, fix=GPSFix.FIX_3D, precision=100 + ) + for i in range(10) + ] + result = remove_noisy_points(points) + assert len(result) == len(points) + + def test_filters_bad_fix(self): + good_0 = _make_gps_point(0.0, 48.0, 11.0, fix=GPSFix.FIX_3D) + bad_1 = _make_gps_point(1.0, 48.001, 11.001, fix=GPSFix.NO_FIX) + good_2 = _make_gps_point(2.0, 48.002, 11.002, fix=GPSFix.FIX_3D) + result = list(remove_noisy_points([good_0, bad_1, good_2])) + # NO_FIX point should be removed; FIX_3D points kept + assert bad_1 not in result + assert good_0 in result + assert good_2 in result + + def test_filters_high_precision(self): + good_0 = _make_gps_point(0.0, 48.0, 11.0, precision=100) + bad_1 = _make_gps_point(1.0, 48.001, 11.001, precision=9999) # Very high DOP + good_2 = _make_gps_point(2.0, 48.002, 11.002, precision=100) + result = list(remove_noisy_points([good_0, bad_1, good_2])) + # High DOP point should be removed; low DOP points kept + assert bad_1 not in result + assert good_0 in result + assert good_2 in result + + def test_none_fix_kept(self): + """Points without GPS fix info should be kept.""" + points = [ + _make_gps_point(0.0, 48.0, 11.0, fix=None), + _make_gps_point(1.0, 48.001, 11.001, fix=None), + ] + result = remove_noisy_points(points) + assert len(result) == 2 + + def test_none_precision_kept(self): + """Points without precision info should be kept.""" + points = [ + _make_gps_point(0.0, 48.0, 11.0, precision=None), + _make_gps_point(1.0, 48.001, 11.001, precision=None), + ] + result = remove_noisy_points(points) + assert len(result) == 2 + + +class TestRemoveOutliers: + def test_short_sequence_unchanged(self): + points = [ + _make_gps_point(0.0, 48.0, 11.0), + ] + result = remove_outliers(points) + assert len(result) == 1 + + def test_no_ground_speed_returns_original(self): + points = [ + _make_gps_point(0.0, 48.0, 11.0, ground_speed=None), + _make_gps_point(1.0, 48.001, 11.001, ground_speed=None), + _make_gps_point(2.0, 48.002, 11.002, ground_speed=None), + ] + result = remove_outliers(points) + assert len(result) == len(points) + + def test_consistent_sequence_kept(self): + points = [ + _make_gps_point( + float(i), 48.0 + i * 0.0001, 11.0 + i * 0.0001, ground_speed=5.0 + ) + for i in range(10) + ] + result = remove_outliers(points) + assert len(result) == len(points) + + def test_outlier_removed(self): + """A point far away from a consistent cluster should be dropped.""" + # 9 points in a tight cluster, then 1 point far away + cluster = [ + _make_gps_point( + float(i), 48.0 + i * 0.00001, 11.0 + i * 0.00001, ground_speed=1.0 + ) + for i in range(9) + ] + outlier = _make_gps_point(9.0, 10.0, 10.0, ground_speed=1.0) + result = remove_outliers(cluster + [outlier]) + # The outlier is far from the cluster and should be removed + assert len(result) < len(cluster) + 1 + # The cluster points should survive + assert len(result) >= len(cluster) diff --git a/tests/unit/test_gpx_serializer.py b/tests/unit/test_gpx_serializer.py new file mode 100644 index 00000000..7744d548 --- /dev/null +++ b/tests/unit/test_gpx_serializer.py @@ -0,0 +1,216 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# +# This source code is licensed under the BSD license found in the +# LICENSE file in the root directory of this source tree. + +from __future__ import annotations + +import xml.etree.ElementTree as ET +from pathlib import Path + +from mapillary_tools.geo import Point +from mapillary_tools.serializer.gpx import GPXSerializer +from mapillary_tools.telemetry import CAMMGPSPoint, GPSFix, GPSPoint +from mapillary_tools.types import ( + ErrorMetadata, + FileType, + ImageMetadata, + VideoMetadata, +) + + +def _make_image( + filename: str, + time: float, + lat: float, + lon: float, + alt: float = 100.0, + seq_uuid: str | None = None, +) -> ImageMetadata: + return ImageMetadata( + time=time, + lat=lat, + lon=lon, + alt=alt, + angle=45.0, + filename=Path(filename), + MAPSequenceUUID=seq_uuid, + MAPFilename=filename, + ) + + +def _make_video(filename: str, points: list[Point]) -> VideoMetadata: + return VideoMetadata( + filename=Path(filename), + filetype=FileType.CAMM, + points=points, + ) + + +def _make_error(filename: str) -> ErrorMetadata: + return ErrorMetadata( + filename=Path(filename), + filetype=FileType.IMAGE, + error=ValueError("test error"), + ) + + +def _parse_gpx(data: bytes) -> ET.Element: + return ET.fromstring(data.decode("utf-8")) + + +class TestGPXSerializerSerialize: + def test_empty_metadatas(self): + result = GPXSerializer.serialize([]) + root = _parse_gpx(result) + assert root.tag.endswith("gpx") + # No tracks + tracks = root.findall(".//{http://www.topografix.com/GPX/1/1}trk") + assert len(tracks) == 0 + + def test_single_image(self): + img = _make_image("img1.jpg", time=1000.0, lat=48.0, lon=11.0, seq_uuid="seq1") + result = GPXSerializer.serialize([img]) + root = _parse_gpx(result) + tracks = root.findall(".//{http://www.topografix.com/GPX/1/1}trk") + assert len(tracks) == 1 + points = root.findall(".//{http://www.topografix.com/GPX/1/1}trkpt") + assert len(points) == 1 + assert float(points[0].attrib["lat"]) == 48.0 + assert float(points[0].attrib["lon"]) == 11.0 + + def test_multiple_images_same_sequence(self): + imgs = [ + _make_image("img1.jpg", time=1000.0, lat=48.0, lon=11.0, seq_uuid="seq1"), + _make_image("img2.jpg", time=1001.0, lat=48.1, lon=11.1, seq_uuid="seq1"), + ] + result = GPXSerializer.serialize(imgs) + root = _parse_gpx(result) + tracks = root.findall(".//{http://www.topografix.com/GPX/1/1}trk") + assert len(tracks) == 1 + points = root.findall(".//{http://www.topografix.com/GPX/1/1}trkpt") + assert len(points) == 2 + + def test_multiple_sequences(self): + imgs = [ + _make_image("img1.jpg", time=1000.0, lat=48.0, lon=11.0, seq_uuid="seq1"), + _make_image("img2.jpg", time=2000.0, lat=49.0, lon=12.0, seq_uuid="seq2"), + ] + result = GPXSerializer.serialize(imgs) + root = _parse_gpx(result) + tracks = root.findall(".//{http://www.topografix.com/GPX/1/1}trk") + assert len(tracks) == 2 + + def test_video_metadata(self): + pts = [ + Point(time=1.0, lat=48.0, lon=11.0, alt=100.0, angle=0.0), + Point(time=2.0, lat=48.1, lon=11.1, alt=110.0, angle=10.0), + ] + video = _make_video("video.mp4", pts) + result = GPXSerializer.serialize([video]) + root = _parse_gpx(result) + tracks = root.findall(".//{http://www.topografix.com/GPX/1/1}trk") + assert len(tracks) == 1 + points = root.findall(".//{http://www.topografix.com/GPX/1/1}trkpt") + assert len(points) == 2 + + def test_error_metadata(self): + err = _make_error("bad.jpg") + result = GPXSerializer.serialize([err]) + root = _parse_gpx(result) + tracks = root.findall(".//{http://www.topografix.com/GPX/1/1}trk") + assert len(tracks) == 1 + # Error tracks have no track points + points = root.findall(".//{http://www.topografix.com/GPX/1/1}trkpt") + assert len(points) == 0 + + def test_mixed_metadatas(self): + img = _make_image("img1.jpg", time=1000.0, lat=48.0, lon=11.0, seq_uuid="s1") + pts = [Point(time=1.0, lat=49.0, lon=12.0, alt=100.0, angle=0.0)] + video = _make_video("video.mp4", pts) + err = _make_error("bad.jpg") + result = GPXSerializer.serialize([img, video, err]) + root = _parse_gpx(result) + tracks = root.findall(".//{http://www.topografix.com/GPX/1/1}trk") + # 1 error track + 1 image sequence track + 1 video track + assert len(tracks) == 3 + + def test_serialize_returns_utf8_bytes(self): + result = GPXSerializer.serialize([]) + assert isinstance(result, bytes) + text = result.decode("utf-8") + assert "