From e17462f7767412dead9c33b53dee8f82efb850fc Mon Sep 17 00:00:00 2001 From: Caglar Pir Date: Tue, 3 Mar 2026 02:40:56 -0800 Subject: [PATCH 1/2] Add unit tests for distance-based video sampling, GPMF parser, and exiftool video reading - sample_video: Add 20 tests for _within_track_time_range_buffered, _sample_video_stream_by_distance, rerun behavior, and end-to-end distance-based sampling with mocked ffmpeg/geotag/MP4 parsing - test_gpmf_parser: Expand GPMF parser test coverage; remove tests for extract_camera_model (removed in dead-code-cleanup) - test_exiftool_read_video: Add new test file for exiftool video reading --- tests/unit/test_exiftool_read_video.py | 1340 ++++++++++++++++++++++++ tests/unit/test_gpmf_parser.py | 1151 +++++++++++++++++++- tests/unit/test_sample_video.py | 510 ++++++++- 3 files changed, 2992 insertions(+), 9 deletions(-) create mode 100644 tests/unit/test_exiftool_read_video.py diff --git a/tests/unit/test_exiftool_read_video.py b/tests/unit/test_exiftool_read_video.py new file mode 100644 index 00000000..ced1c502 --- /dev/null +++ b/tests/unit/test_exiftool_read_video.py @@ -0,0 +1,1340 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# +# This source code is licensed under the BSD license found in the +# LICENSE file in the root directory of this source tree. + +from __future__ import annotations + +import xml.etree.ElementTree as ET + +import pytest + +from mapillary_tools.exiftool_read_video import ( + _aggregate_gps_track, + _aggregate_gps_track_by_sample_time, + _aggregate_samples, + _deduplicate_gps_points, + _extract_alternative_fields, + _index_text_by_tag, + _same_gps_point, + ExifToolReadVideo, + expand_tag, +) +from mapillary_tools.telemetry import GPSFix, GPSPoint + + +# --------------------------------------------------------------------------- +# Helper: build an ElementTree from raw XML strings +# --------------------------------------------------------------------------- + + +def _etree_from_xml(xml_str: str) -> ET.ElementTree: + """Parse XML and return an ElementTree rooted at rdf:Description. + + In production, ExifToolReadVideo receives an ElementTree whose root is the + rdf:Description element (not the enclosing rdf:RDF). This helper mimics + that behaviour: it parses the full XML, finds the first rdf:Description + child, and wraps it in an ElementTree. + """ + rdf_root = ET.fromstring(xml_str) + rdf_ns = "http://www.w3.org/1999/02/22-rdf-syntax-ns#" + desc = rdf_root.find(f"{{{rdf_ns}}}Description") + if desc is None: + raise ValueError("No rdf:Description found in the XML") + return ET.ElementTree(desc) + + +def _make_element(tag: str, text: str) -> ET.Element: + el = ET.Element(expand_tag(tag)) + el.text = text + return el + + +# --------------------------------------------------------------------------- +# Real-world-style XML fixtures +# --------------------------------------------------------------------------- + +BLACKVUE_XML = """\ + + + + BlackVue + DR900S-2CH + BV900S123456 + 2019:09:02 10:23:28.00Z + 37.265547 + 28.213497 + 52.7561 + 133.46 + 402.9 + 2019:09:02 10:23:29.00Z + 37.265461 + 28.213611 + 50.0466 + 133.11 + 402.8 + 2019:09:02 10:23:30.00Z + 37.265378 + 28.213722 + 47.6205 + 132.83 + 402.8 + + +""" + +# GoPro Track-based GPS (Track namespace). +# Note: SampleTime/SampleDuration are plain floats (exiftool XML numeric output). +GOPRO_XML = """\ + + + + GoPro + GoPro Max + C3456789012345 + 0 + 1.001 + 47.359832 + 8.522706 + 414.9 + 2022:07:31 00:25:23.200Z + 5.376 + 167.58 + 3 + 2.19 + 1.001 + 1.001 + 47.359810 + 8.522680 + 415.2 + 2022:07:31 00:25:24.200Z + 4.992 + 168.23 + 3 + 2.15 + + +""" + +INSTA360_XML = """\ + + + + Insta360 + Insta360 X3 + ISN12345678 + 2023:09:23 15:13:34.00Z + 47.371 + 8.542 + 408.5 + 2023:09:23 15:13:35.00Z + 47.372 + 8.543 + 409.1 + + +""" + + +# --------------------------------------------------------------------------- +# _index_text_by_tag +# --------------------------------------------------------------------------- + + +class TestIndexTextByTag: + def test_basic_indexing(self): + elements = [ + _make_element("QuickTime:GPSLatitude", "37.0"), + _make_element("QuickTime:GPSLongitude", "28.0"), + _make_element("QuickTime:GPSLatitude", "38.0"), + ] + result = _index_text_by_tag(elements) + lat_tag = expand_tag("QuickTime:GPSLatitude") + lon_tag = expand_tag("QuickTime:GPSLongitude") + assert result[lat_tag] == ["37.0", "38.0"] + assert result[lon_tag] == ["28.0"] + + def test_empty_elements(self): + result = _index_text_by_tag([]) + assert result == {} + + def test_element_with_no_text_is_skipped(self): + el = ET.Element(expand_tag("QuickTime:GPSLatitude")) + # el.text is None by default + result = _index_text_by_tag([el]) + assert result == {} + + def test_mixed_text_and_none(self): + el1 = _make_element("QuickTime:GPSLatitude", "37.0") + el2 = ET.Element(expand_tag("QuickTime:GPSLatitude")) + el3 = _make_element("QuickTime:GPSLatitude", "38.0") + result = _index_text_by_tag([el1, el2, el3]) + lat_tag = expand_tag("QuickTime:GPSLatitude") + # Only elements with text are included + assert result[lat_tag] == ["37.0", "38.0"] + + +# --------------------------------------------------------------------------- +# _extract_alternative_fields +# --------------------------------------------------------------------------- + + +class TestExtractAlternativeFields: + def test_extract_float_value(self): + texts = {expand_tag("QuickTime:GPSLatitude"): ["37.265547"]} + result = _extract_alternative_fields(texts, ["QuickTime:GPSLatitude"], float) + assert result == pytest.approx(37.265547) + + def test_extract_int_value(self): + texts = {expand_tag("Track1:GPSMeasureMode"): ["3"]} + result = _extract_alternative_fields(texts, ["Track1:GPSMeasureMode"], int) + assert result == 3 + + def test_extract_str_value(self): + texts = {expand_tag("IFD0:Make"): ["BlackVue"]} + result = _extract_alternative_fields(texts, ["IFD0:Make"], str) + assert result == "BlackVue" + + def test_extract_list_value(self): + texts = {expand_tag("QuickTime:GPSLatitude"): ["37.0", "38.0"]} + result = _extract_alternative_fields(texts, ["QuickTime:GPSLatitude"], list) + assert result == ["37.0", "38.0"] + + def test_returns_none_for_missing_field(self): + texts = {expand_tag("QuickTime:GPSLatitude"): ["37.0"]} + result = _extract_alternative_fields(texts, ["QuickTime:GPSLongitude"], float) + assert result is None + + def test_alternative_fallback(self): + """First field is missing, second is present.""" + texts = {expand_tag("UserData:Make"): ["TestMake"]} + result = _extract_alternative_fields(texts, ["IFD0:Make", "UserData:Make"], str) + assert result == "TestMake" + + def test_first_alternative_preferred(self): + """When both fields are present, first is used.""" + texts = { + expand_tag("IFD0:Make"): ["First"], + expand_tag("UserData:Make"): ["Second"], + } + result = _extract_alternative_fields(texts, ["IFD0:Make", "UserData:Make"], str) + assert result == "First" + + def test_invalid_float_returns_none(self): + texts = {expand_tag("QuickTime:GPSLatitude"): ["not_a_number"]} + result = _extract_alternative_fields(texts, ["QuickTime:GPSLatitude"], float) + assert result is None + + def test_invalid_int_returns_none(self): + texts = {expand_tag("Track1:GPSMeasureMode"): ["abc"]} + result = _extract_alternative_fields(texts, ["Track1:GPSMeasureMode"], int) + assert result is None + + def test_all_fields_missing_returns_none(self): + result = _extract_alternative_fields({}, ["IFD0:Make", "UserData:Make"], str) + assert result is None + + def test_invalid_field_type_raises(self): + texts = {expand_tag("IFD0:Make"): ["val"]} + with pytest.raises(ValueError, match="Invalid field type"): + _extract_alternative_fields(texts, ["IFD0:Make"], dict) # type: ignore + + +# --------------------------------------------------------------------------- +# _same_gps_point and _deduplicate_gps_points +# --------------------------------------------------------------------------- + + +class TestSameGpsPoint: + def _make_point(self, **overrides) -> GPSPoint: + defaults = dict( + time=0.0, + lat=37.0, + lon=28.0, + alt=400.0, + angle=133.0, + epoch_time=None, + fix=None, + precision=None, + ground_speed=None, + ) + defaults.update(overrides) + return GPSPoint(**defaults) + + def test_identical_points_are_same(self): + p = self._make_point() + assert _same_gps_point(p, p) is True + + def test_different_alt_are_same(self): + """Altitude does not affect sameness.""" + p1 = self._make_point(alt=400.0) + p2 = self._make_point(alt=500.0) + assert _same_gps_point(p1, p2) is True + + def test_different_lat_are_not_same(self): + p1 = self._make_point(lat=37.0) + p2 = self._make_point(lat=38.0) + assert _same_gps_point(p1, p2) is False + + def test_different_lon_are_not_same(self): + p1 = self._make_point(lon=28.0) + p2 = self._make_point(lon=29.0) + assert _same_gps_point(p1, p2) is False + + def test_different_time_are_not_same(self): + p1 = self._make_point(time=0.0) + p2 = self._make_point(time=1.0) + assert _same_gps_point(p1, p2) is False + + def test_different_angle_are_not_same(self): + p1 = self._make_point(angle=100.0) + p2 = self._make_point(angle=200.0) + assert _same_gps_point(p1, p2) is False + + def test_different_epoch_time_are_not_same(self): + p1 = self._make_point(epoch_time=1000.0) + p2 = self._make_point(epoch_time=2000.0) + assert _same_gps_point(p1, p2) is False + + def test_different_ground_speed_are_same(self): + """ground_speed is not checked by _same_gps_point.""" + p1 = self._make_point(ground_speed=10.0) + p2 = self._make_point(ground_speed=50.0) + assert _same_gps_point(p1, p2) is True + + +class TestDeduplicateGpsPoints: + def _make_point(self, **overrides) -> GPSPoint: + defaults = dict( + time=0.0, + lat=37.0, + lon=28.0, + alt=400.0, + angle=133.0, + epoch_time=None, + fix=None, + precision=None, + ground_speed=None, + ) + defaults.update(overrides) + return GPSPoint(**defaults) + + def test_empty_track(self): + assert _deduplicate_gps_points([], _same_gps_point) == [] + + def test_no_duplicates(self): + track = [ + self._make_point(time=0.0, lat=37.0), + self._make_point(time=1.0, lat=37.1), + ] + result = _deduplicate_gps_points(track, _same_gps_point) + assert len(result) == 2 + + def test_consecutive_duplicates_removed(self): + p = self._make_point(time=0.0, lat=37.0) + track = [p, p, p] + result = _deduplicate_gps_points(track, _same_gps_point) + assert len(result) == 1 + + def test_non_consecutive_duplicates_kept(self): + p1 = self._make_point(time=0.0, lat=37.0) + p2 = self._make_point(time=1.0, lat=38.0) + p3 = self._make_point(time=0.0, lat=37.0) # same as p1 but not consecutive + track = [p1, p2, p3] + result = _deduplicate_gps_points(track, _same_gps_point) + assert len(result) == 3 + + def test_only_alt_differs_is_duplicate(self): + """Points differing only by altitude are considered same.""" + p1 = self._make_point(alt=400.0) + p2 = self._make_point(alt=500.0) + result = _deduplicate_gps_points([p1, p2], _same_gps_point) + assert len(result) == 1 + + +# --------------------------------------------------------------------------- +# _aggregate_gps_track +# --------------------------------------------------------------------------- + + +class TestAggregateGpsTrack: + def test_basic_quicktime_track(self): + texts = { + expand_tag("QuickTime:GPSLongitude"): [ + "28.213497", + "28.213611", + "28.213722", + ], + expand_tag("QuickTime:GPSLatitude"): [ + "37.265547", + "37.265461", + "37.265378", + ], + expand_tag("QuickTime:GPSDateTime"): [ + "2019:09:02 10:23:28.00Z", + "2019:09:02 10:23:29.00Z", + "2019:09:02 10:23:30.00Z", + ], + expand_tag("QuickTime:GPSAltitude"): ["402.9", "402.8", "402.8"], + expand_tag("QuickTime:GPSTrack"): ["133.46", "133.11", "132.83"], + } + track = _aggregate_gps_track( + texts, + time_tag="QuickTime:GPSDateTime", + lon_tag="QuickTime:GPSLongitude", + lat_tag="QuickTime:GPSLatitude", + alt_tag="QuickTime:GPSAltitude", + direction_tag="QuickTime:GPSTrack", + gps_time_tag="QuickTime:GPSDateTime", + ) + assert len(track) == 3 + # Times should be normalized relative to first point + assert track[0].time == pytest.approx(0.0) + assert track[1].time == pytest.approx(1.0) + assert track[2].time == pytest.approx(2.0) + # Check coordinates + assert track[0].lat == pytest.approx(37.265547) + assert track[0].lon == pytest.approx(28.213497) + assert track[0].alt == pytest.approx(402.9) + assert track[0].angle == pytest.approx(133.46) + # epoch_time should be set from gps_time_tag + for p in track: + assert p.epoch_time is not None + + def test_mismatched_lon_lat_returns_empty(self): + texts = { + expand_tag("QuickTime:GPSLongitude"): ["28.0", "29.0"], + expand_tag("QuickTime:GPSLatitude"): ["37.0"], + } + track = _aggregate_gps_track( + texts, + time_tag=None, + lon_tag="QuickTime:GPSLongitude", + lat_tag="QuickTime:GPSLatitude", + ) + assert track == [] + + def test_mismatched_timestamps_returns_empty(self): + texts = { + expand_tag("QuickTime:GPSLongitude"): ["28.0", "29.0"], + expand_tag("QuickTime:GPSLatitude"): ["37.0", "38.0"], + expand_tag("QuickTime:GPSDateTime"): [ + "2019:09:02 10:23:28.00Z", + ], + } + track = _aggregate_gps_track( + texts, + time_tag="QuickTime:GPSDateTime", + lon_tag="QuickTime:GPSLongitude", + lat_tag="QuickTime:GPSLatitude", + ) + assert track == [] + + def test_no_time_tag_uses_zero(self): + texts = { + expand_tag("QuickTime:GPSLongitude"): ["28.0", "29.0"], + expand_tag("QuickTime:GPSLatitude"): ["37.0", "38.0"], + } + track = _aggregate_gps_track( + texts, + time_tag=None, + lon_tag="QuickTime:GPSLongitude", + lat_tag="QuickTime:GPSLatitude", + ) + # Without time_tag, all times are 0.0 but points differ in lat/lon + # Deduplication checks time+lat+lon so both should remain + assert len(track) == 2 + for p in track: + assert p.time == 0.0 + assert p.epoch_time is None + + def test_empty_track(self): + track = _aggregate_gps_track( + {}, + time_tag=None, + lon_tag="QuickTime:GPSLongitude", + lat_tag="QuickTime:GPSLatitude", + ) + assert track == [] + + def test_none_lon_lat_values_skipped(self): + """If a value cannot be parsed as float, it is skipped.""" + texts = { + expand_tag("QuickTime:GPSLongitude"): ["28.0", "invalid"], + expand_tag("QuickTime:GPSLatitude"): ["37.0", "38.0"], + expand_tag("QuickTime:GPSDateTime"): [ + "2019:09:02 10:00:00Z", + "2019:09:02 10:00:01Z", + ], + } + track = _aggregate_gps_track( + texts, + time_tag="QuickTime:GPSDateTime", + lon_tag="QuickTime:GPSLongitude", + lat_tag="QuickTime:GPSLatitude", + ) + # Second point has invalid lon so it gets skipped + assert len(track) == 1 + + def test_altitude_and_direction_shorter_padded_with_none(self): + """Optional arrays shorter than coord arrays are padded with None.""" + texts = { + expand_tag("QuickTime:GPSLongitude"): ["28.0", "29.0"], + expand_tag("QuickTime:GPSLatitude"): ["37.0", "38.0"], + expand_tag("QuickTime:GPSAltitude"): ["400.0"], # only 1 for 2 coords + } + track = _aggregate_gps_track( + texts, + time_tag=None, + lon_tag="QuickTime:GPSLongitude", + lat_tag="QuickTime:GPSLatitude", + alt_tag="QuickTime:GPSAltitude", + ) + assert len(track) == 2 + assert track[0].alt == pytest.approx(400.0) + assert track[1].alt is None + + def test_ground_speed_tag(self): + texts = { + expand_tag("QuickTime:GPSLongitude"): ["28.0"], + expand_tag("QuickTime:GPSLatitude"): ["37.0"], + expand_tag("QuickTime:GPSSpeed"): ["52.7561"], + } + track = _aggregate_gps_track( + texts, + time_tag=None, + lon_tag="QuickTime:GPSLongitude", + lat_tag="QuickTime:GPSLatitude", + ground_speed_tag="QuickTime:GPSSpeed", + ) + assert len(track) == 1 + assert track[0].ground_speed == pytest.approx(52.7561) + + def test_gps_time_tag_length_mismatch_falls_back_to_none(self): + texts = { + expand_tag("QuickTime:GPSLongitude"): ["28.0", "29.0"], + expand_tag("QuickTime:GPSLatitude"): ["37.0", "38.0"], + expand_tag("QuickTime:GPSDateTime"): [ + "2019:09:02 10:00:00Z", + "2019:09:02 10:00:01Z", + ], + expand_tag("QuickTime:GPSTimeStamp"): [ + "2019:09:02 10:00:10Z", + ], + } + track = _aggregate_gps_track( + texts, + time_tag="QuickTime:GPSDateTime", + lon_tag="QuickTime:GPSLongitude", + lat_tag="QuickTime:GPSLatitude", + gps_time_tag="QuickTime:GPSTimeStamp", + ) + assert len(track) == 2 + # Mismatch in gps_time_tag length: epoch_time falls back to None + for p in track: + assert p.epoch_time is None + + def test_track_is_sorted_by_time(self): + """Points with out-of-order timestamps are sorted.""" + texts = { + expand_tag("QuickTime:GPSLongitude"): ["28.0", "29.0"], + expand_tag("QuickTime:GPSLatitude"): ["37.0", "38.0"], + expand_tag("QuickTime:GPSDateTime"): [ + "2019:09:02 10:00:02Z", + "2019:09:02 10:00:00Z", + ], + } + track = _aggregate_gps_track( + texts, + time_tag="QuickTime:GPSDateTime", + lon_tag="QuickTime:GPSLongitude", + lat_tag="QuickTime:GPSLatitude", + ) + assert len(track) == 2 + assert track[0].time < track[1].time + + +# --------------------------------------------------------------------------- +# _aggregate_samples +# --------------------------------------------------------------------------- + + +class TestAggregateSamples: + def test_basic_two_samples(self): + elements = [ + _make_element("Track1:SampleTime", "0"), + _make_element("Track1:SampleDuration", "1.001"), + _make_element("Track1:GPSLatitude", "47.0"), + _make_element("Track1:GPSLongitude", "8.0"), + _make_element("Track1:SampleTime", "1.001"), + _make_element("Track1:SampleDuration", "1.001"), + _make_element("Track1:GPSLatitude", "47.1"), + _make_element("Track1:GPSLongitude", "8.1"), + ] + samples = list( + _aggregate_samples(elements, "Track1:SampleTime", "Track1:SampleDuration") + ) + assert len(samples) == 2 + # First sample + sample_time, sample_dur, elems = samples[0] + assert sample_time == pytest.approx(0.0) + assert sample_dur == pytest.approx(1.001) + # Elements are GPS data without SampleTime/SampleDuration + assert len(elems) == 2 + # Second sample + sample_time2, sample_dur2, elems2 = samples[1] + assert sample_time2 == pytest.approx(1.001) + assert sample_dur2 == pytest.approx(1.001) + assert len(elems2) == 2 + + def test_empty_elements(self): + samples = list( + _aggregate_samples([], "Track1:SampleTime", "Track1:SampleDuration") + ) + assert samples == [] + + def test_sample_time_none_skips_sample(self): + """If sample time cannot be parsed as float, the sample is skipped.""" + elements = [ + _make_element("Track1:SampleTime", "not_a_number"), + _make_element("Track1:SampleDuration", "1.0"), + _make_element("Track1:GPSLatitude", "47.0"), + ] + samples = list( + _aggregate_samples(elements, "Track1:SampleTime", "Track1:SampleDuration") + ) + # sample_time is None so the last yield won't happen for that group + assert len(samples) == 0 + + def test_missing_duration_skips_sample(self): + """If SampleDuration is missing, sample is not yielded.""" + elements = [ + _make_element("Track1:SampleTime", "0"), + # No SampleDuration element + _make_element("Track1:GPSLatitude", "47.0"), + ] + samples = list( + _aggregate_samples(elements, "Track1:SampleTime", "Track1:SampleDuration") + ) + # sample_duration is None, so the sample is skipped + assert len(samples) == 0 + + def test_last_sample_is_yielded(self): + """The final sample (not followed by another SampleTime) is also yielded.""" + elements = [ + _make_element("Track1:SampleTime", "0"), + _make_element("Track1:SampleDuration", "2.0"), + _make_element("Track1:GPSLatitude", "47.0"), + ] + samples = list( + _aggregate_samples(elements, "Track1:SampleTime", "Track1:SampleDuration") + ) + assert len(samples) == 1 + sample_time, sample_dur, elems = samples[0] + assert sample_time == pytest.approx(0.0) + assert sample_dur == pytest.approx(2.0) + assert len(elems) == 1 + + +# --------------------------------------------------------------------------- +# _aggregate_gps_track_by_sample_time +# --------------------------------------------------------------------------- + + +class TestAggregateGpsTrackBySampleTime: + def test_basic_sample_aggregation(self): + track_ns = "Track1" + elements = [ + _make_element(f"{track_ns}:GPSLongitude", "8.522706"), + _make_element(f"{track_ns}:GPSLatitude", "47.359832"), + _make_element(f"{track_ns}:GPSAltitude", "414.9"), + _make_element(f"{track_ns}:GPSDateTime", "2022:07:31 00:25:23.200Z"), + _make_element(f"{track_ns}:GPSSpeed", "5.376"), + _make_element(f"{track_ns}:GPSTrack", "167.58"), + ] + sample_iterator = [(0.0, 1.001, elements)] + track = _aggregate_gps_track_by_sample_time( + sample_iterator, + lon_tag=f"{track_ns}:GPSLongitude", + lat_tag=f"{track_ns}:GPSLatitude", + alt_tag=f"{track_ns}:GPSAltitude", + gps_time_tag=f"{track_ns}:GPSDateTime", + direction_tag=f"{track_ns}:GPSTrack", + ground_speed_tag=f"{track_ns}:GPSSpeed", + ) + assert len(track) == 1 + assert track[0].lat == pytest.approx(47.359832) + assert track[0].lon == pytest.approx(8.522706) + assert track[0].alt == pytest.approx(414.9) + assert track[0].ground_speed == pytest.approx(5.376) + assert track[0].angle == pytest.approx(167.58) + + def test_gps_fix_from_measure_mode(self): + track_ns = "Track1" + elements = [ + _make_element(f"{track_ns}:GPSLongitude", "8.0"), + _make_element(f"{track_ns}:GPSLatitude", "47.0"), + _make_element(f"{track_ns}:GPSMeasureMode", "3"), + ] + sample_iterator = [(0.0, 1.0, elements)] + track = _aggregate_gps_track_by_sample_time( + sample_iterator, + lon_tag=f"{track_ns}:GPSLongitude", + lat_tag=f"{track_ns}:GPSLatitude", + gps_fix_tag=f"{track_ns}:GPSMeasureMode", + ) + assert len(track) == 1 + assert track[0].fix == GPSFix.FIX_3D + + def test_gps_fix_2d(self): + track_ns = "Track1" + elements = [ + _make_element(f"{track_ns}:GPSLongitude", "8.0"), + _make_element(f"{track_ns}:GPSLatitude", "47.0"), + _make_element(f"{track_ns}:GPSMeasureMode", "2"), + ] + sample_iterator = [(0.0, 1.0, elements)] + track = _aggregate_gps_track_by_sample_time( + sample_iterator, + lon_tag=f"{track_ns}:GPSLongitude", + lat_tag=f"{track_ns}:GPSLatitude", + gps_fix_tag=f"{track_ns}:GPSMeasureMode", + ) + assert len(track) == 1 + assert track[0].fix == GPSFix.FIX_2D + + def test_gps_fix_no_fix(self): + track_ns = "Track1" + elements = [ + _make_element(f"{track_ns}:GPSLongitude", "8.0"), + _make_element(f"{track_ns}:GPSLatitude", "47.0"), + _make_element(f"{track_ns}:GPSMeasureMode", "0"), + ] + sample_iterator = [(0.0, 1.0, elements)] + track = _aggregate_gps_track_by_sample_time( + sample_iterator, + lon_tag=f"{track_ns}:GPSLongitude", + lat_tag=f"{track_ns}:GPSLatitude", + gps_fix_tag=f"{track_ns}:GPSMeasureMode", + ) + assert len(track) == 1 + assert track[0].fix == GPSFix.NO_FIX + + def test_invalid_gps_fix_is_none(self): + track_ns = "Track1" + elements = [ + _make_element(f"{track_ns}:GPSLongitude", "8.0"), + _make_element(f"{track_ns}:GPSLatitude", "47.0"), + _make_element(f"{track_ns}:GPSMeasureMode", "99"), + ] + sample_iterator = [(0.0, 1.0, elements)] + track = _aggregate_gps_track_by_sample_time( + sample_iterator, + lon_tag=f"{track_ns}:GPSLongitude", + lat_tag=f"{track_ns}:GPSLatitude", + gps_fix_tag=f"{track_ns}:GPSMeasureMode", + ) + assert len(track) == 1 + assert track[0].fix is None + + def test_gps_precision_scaled(self): + """GPS precision should be multiplied by 100 (meters to GPSP units).""" + track_ns = "Track1" + elements = [ + _make_element(f"{track_ns}:GPSLongitude", "8.0"), + _make_element(f"{track_ns}:GPSLatitude", "47.0"), + _make_element(f"{track_ns}:GPSHPositioningError", "2.19"), + ] + sample_iterator = [(0.0, 1.0, elements)] + track = _aggregate_gps_track_by_sample_time( + sample_iterator, + lon_tag=f"{track_ns}:GPSLongitude", + lat_tag=f"{track_ns}:GPSLatitude", + gps_precision_tag=f"{track_ns}:GPSHPositioningError", + ) + assert len(track) == 1 + assert track[0].precision == pytest.approx(219.0) + + def test_multiple_points_per_sample_get_interpolated_time(self): + """Multiple GPS points within a single sample get evenly spaced times.""" + track_ns = "Track1" + elements = [ + _make_element(f"{track_ns}:GPSLongitude", "8.0"), + _make_element(f"{track_ns}:GPSLatitude", "47.0"), + _make_element(f"{track_ns}:GPSLongitude", "8.1"), + _make_element(f"{track_ns}:GPSLatitude", "47.1"), + ] + sample_iterator = [(0.0, 2.0, elements)] + track = _aggregate_gps_track_by_sample_time( + sample_iterator, + lon_tag=f"{track_ns}:GPSLongitude", + lat_tag=f"{track_ns}:GPSLatitude", + ) + assert len(track) == 2 + assert track[0].time == pytest.approx(0.0) + assert track[1].time == pytest.approx(1.0) + + def test_empty_sample_iterator(self): + track = _aggregate_gps_track_by_sample_time( + iter([]), + lon_tag="Track1:GPSLongitude", + lat_tag="Track1:GPSLatitude", + ) + assert track == [] + + def test_sample_with_no_gps_data(self): + """A sample with no GPS elements produces no points.""" + sample_iterator = [(0.0, 1.0, [])] + track = _aggregate_gps_track_by_sample_time( + sample_iterator, + lon_tag="Track1:GPSLongitude", + lat_tag="Track1:GPSLatitude", + ) + assert track == [] + + def test_two_samples_sorted_by_time(self): + track_ns = "Track1" + # Sample 2 comes before sample 1 in input + elements1 = [ + _make_element(f"{track_ns}:GPSLongitude", "8.0"), + _make_element(f"{track_ns}:GPSLatitude", "47.0"), + ] + elements2 = [ + _make_element(f"{track_ns}:GPSLongitude", "8.1"), + _make_element(f"{track_ns}:GPSLatitude", "47.1"), + ] + sample_iterator = [ + (2.0, 1.0, elements2), + (0.0, 1.0, elements1), + ] + track = _aggregate_gps_track_by_sample_time( + sample_iterator, + lon_tag=f"{track_ns}:GPSLongitude", + lat_tag=f"{track_ns}:GPSLatitude", + ) + assert len(track) == 2 + assert track[0].time < track[1].time + + +# --------------------------------------------------------------------------- +# ExifToolReadVideo.__init__ +# --------------------------------------------------------------------------- + + +class TestExifToolReadVideoInit: + def test_init_from_blackvue_xml(self): + etree = _etree_from_xml(BLACKVUE_XML) + reader = ExifToolReadVideo(etree) + assert reader.etree is etree + # Internal state should be populated + assert len(reader._texts_by_tag) > 0 + assert len(reader._all_tags) > 0 + + def test_init_from_gopro_xml(self): + etree = _etree_from_xml(GOPRO_XML) + reader = ExifToolReadVideo(etree) + assert reader.etree is etree + + def test_init_from_insta360_xml(self): + etree = _etree_from_xml(INSTA360_XML) + reader = ExifToolReadVideo(etree) + assert reader.etree is etree + + +# --------------------------------------------------------------------------- +# ExifToolReadVideo.extract_make / extract_model / _extract_make_and_model +# --------------------------------------------------------------------------- + + +class TestExtractMakeAndModel: + def test_blackvue_make_and_model(self): + etree = _etree_from_xml(BLACKVUE_XML) + reader = ExifToolReadVideo(etree) + assert reader.extract_make() == "BlackVue" + assert reader.extract_model() == "DR900S-2CH" + + def test_gopro_make_and_model(self): + etree = _etree_from_xml(GOPRO_XML) + reader = ExifToolReadVideo(etree) + assert reader.extract_make() == "GoPro" + assert reader.extract_model() == "GoPro Max" + + def test_insta360_make_and_model(self): + etree = _etree_from_xml(INSTA360_XML) + reader = ExifToolReadVideo(etree) + assert reader.extract_make() == "Insta360" + assert reader.extract_model() == "Insta360 X3" + + def test_gopro_make_defaults_when_missing(self): + """If GoPro:Model is present but GoPro:Make is missing, make defaults to 'GoPro'.""" + xml = """\ + + + + HERO11 + + +""" + reader = ExifToolReadVideo(_etree_from_xml(xml)) + assert reader.extract_make() == "GoPro" + assert reader.extract_model() == "HERO11" + + def test_insta360_make_defaults_when_missing(self): + """If Insta360:Model is present but Insta360:Make is missing, make defaults to 'Insta360'.""" + xml = """\ + + + + ONE RS + + +""" + reader = ExifToolReadVideo(_etree_from_xml(xml)) + assert reader.extract_make() == "Insta360" + assert reader.extract_model() == "ONE RS" + + def test_no_make_no_model(self): + """When no make/model tags exist, both return None.""" + xml = """\ + + + + + +""" + reader = ExifToolReadVideo(_etree_from_xml(xml)) + assert reader.extract_make() is None + assert reader.extract_model() is None + + def test_make_with_whitespace_stripped(self): + xml = """\ + + + + SomeMake + SomeModel + + +""" + reader = ExifToolReadVideo(_etree_from_xml(xml)) + assert reader.extract_make() == "SomeMake" + assert reader.extract_model() == "SomeModel" + + def test_gopro_takes_priority_over_ifd0(self): + """GoPro namespace is checked first, so it takes priority over IFD0.""" + xml = """\ + + + + GoPro + HERO12 + OtherMake + OtherModel + + +""" + reader = ExifToolReadVideo(_etree_from_xml(xml)) + assert reader.extract_make() == "GoPro" + assert reader.extract_model() == "HERO12" + + def test_userdata_make_fallback(self): + """UserData:Make is used when IFD0:Make is not present.""" + xml = """\ + + + + UserDataMake + UserDataModel + + +""" + reader = ExifToolReadVideo(_etree_from_xml(xml)) + assert reader.extract_make() == "UserDataMake" + assert reader.extract_model() == "UserDataModel" + + def test_make_without_model(self): + """IFD0:Make present but no model tag anywhere returns (make, None).""" + xml = """\ + + + + JustMake + + +""" + reader = ExifToolReadVideo(_etree_from_xml(xml)) + assert reader.extract_make() == "JustMake" + assert reader.extract_model() is None + + +# --------------------------------------------------------------------------- +# ExifToolReadVideo.extract_camera_uuid +# --------------------------------------------------------------------------- + + +class TestExtractCameraUUID: + def test_blackvue_serial(self): + etree = _etree_from_xml(BLACKVUE_XML) + reader = ExifToolReadVideo(etree) + assert reader.extract_camera_uuid() == "BV900S123456" + + def test_gopro_serial(self): + etree = _etree_from_xml(GOPRO_XML) + reader = ExifToolReadVideo(etree) + assert reader.extract_camera_uuid() == "C3456789012345" + + def test_insta360_serial(self): + etree = _etree_from_xml(INSTA360_XML) + reader = ExifToolReadVideo(etree) + assert reader.extract_camera_uuid() == "ISN12345678" + + def test_no_serial_returns_none(self): + xml = """\ + + + + + +""" + reader = ExifToolReadVideo(_etree_from_xml(xml)) + assert reader.extract_camera_uuid() is None + + def test_body_and_lens_serial_combined(self): + xml = """\ + + + + BODY123 + LENS456 + + +""" + reader = ExifToolReadVideo(_etree_from_xml(xml)) + assert reader.extract_camera_uuid() == "BODY123_LENS456" + + def test_serial_with_special_chars_sanitized(self): + """Serial numbers with non-alphanumeric characters are sanitized.""" + xml = """\ + + + + SN-123_456 + + +""" + reader = ExifToolReadVideo(_etree_from_xml(xml)) + # sanitize_serial removes non-alphanumeric chars + assert reader.extract_camera_uuid() == "SN123456" + + def test_empty_serial_after_sanitization_returns_none(self): + """A serial that becomes empty after sanitization yields None.""" + xml = """\ + + + + --- + + +""" + reader = ExifToolReadVideo(_etree_from_xml(xml)) + assert reader.extract_camera_uuid() is None + + def test_gopro_serial_priority_over_generic(self): + """GoPro:SerialNumber is checked before generic ExifIFD:SerialNumber.""" + xml = """\ + + + + GPSERIAL + EXIFSERIAL + + +""" + reader = ExifToolReadVideo(_etree_from_xml(xml)) + assert reader.extract_camera_uuid() == "GPSERIAL" + + def test_dji_serial(self): + xml = """\ + + + + DJI123456 + + +""" + reader = ExifToolReadVideo(_etree_from_xml(xml)) + assert reader.extract_camera_uuid() == "DJI123456" + + def test_lens_serial_only(self): + """Only lens serial, no body serial.""" + xml = """\ + + + + LENS789 + + +""" + reader = ExifToolReadVideo(_etree_from_xml(xml)) + assert reader.extract_camera_uuid() == "LENS789" + + +# --------------------------------------------------------------------------- +# ExifToolReadVideo.extract_gps_track (integration) +# --------------------------------------------------------------------------- + + +class TestExtractGpsTrack: + def test_blackvue_quicktime_gps(self): + """BlackVue uses QuickTime namespace - the first path in extract_gps_track.""" + etree = _etree_from_xml(BLACKVUE_XML) + reader = ExifToolReadVideo(etree) + track = reader.extract_gps_track() + assert len(track) == 3 + # Time should be normalized (starts at 0) + assert track[0].time == pytest.approx(0.0) + assert track[1].time == pytest.approx(1.0) + assert track[2].time == pytest.approx(2.0) + # Check coordinates of first point + assert track[0].lat == pytest.approx(37.265547) + assert track[0].lon == pytest.approx(28.213497) + assert track[0].alt == pytest.approx(402.9) + assert track[0].angle == pytest.approx(133.46) + + def test_insta360_gps(self): + """Insta360 uses Insta360 namespace - the second path in extract_gps_track.""" + etree = _etree_from_xml(INSTA360_XML) + reader = ExifToolReadVideo(etree) + track = reader.extract_gps_track() + assert len(track) == 2 + assert track[0].lat == pytest.approx(47.371) + assert track[0].lon == pytest.approx(8.542) + assert track[0].alt == pytest.approx(408.5) + + def test_gopro_track_gps(self): + """GoPro uses Track namespace - the third path in extract_gps_track.""" + etree = _etree_from_xml(GOPRO_XML) + reader = ExifToolReadVideo(etree) + track = reader.extract_gps_track() + assert len(track) == 2 + assert track[0].lat == pytest.approx(47.359832) + assert track[0].lon == pytest.approx(8.522706) + + def test_empty_gps_track(self): + """When no GPS data is present, returns empty list.""" + xml = """\ + + + + Unknown + + +""" + reader = ExifToolReadVideo(_etree_from_xml(xml)) + track = reader.extract_gps_track() + assert track == [] + + def test_quicktime_preferred_over_track(self): + """If both QuickTime and Track GPS data exist, QuickTime is used.""" + xml = """\ + + + + 2019:09:02 10:23:28.00Z + 37.0 + 28.0 + 0 + 1.0 + 47.0 + 8.0 + + +""" + reader = ExifToolReadVideo(_etree_from_xml(xml)) + track = reader.extract_gps_track() + # QuickTime is preferred, so lat should be ~37, not ~47 + assert len(track) >= 1 + assert track[0].lat == pytest.approx(37.0) + + +# --------------------------------------------------------------------------- +# ExifToolReadVideo._extract_gps_track_from_quicktime +# --------------------------------------------------------------------------- + + +class TestExtractGpsTrackFromQuicktime: + def test_missing_required_tags_returns_empty(self): + """Without all three required tags, returns empty list.""" + xml = """\ + + + + 37.0 + 28.0 + + +""" + reader = ExifToolReadVideo(_etree_from_xml(xml)) + # GPSDateTime is missing, so _extract_gps_track_from_quicktime returns [] + track = reader._extract_gps_track_from_quicktime() + assert track == [] + + def test_custom_namespace_insta360(self): + etree = _etree_from_xml(INSTA360_XML) + reader = ExifToolReadVideo(etree) + track = reader._extract_gps_track_from_quicktime(namespace="Insta360") + assert len(track) == 2 + assert track[0].lat == pytest.approx(47.371) + + def test_custom_namespace_not_present(self): + """Using a namespace that has no data returns empty.""" + etree = _etree_from_xml(BLACKVUE_XML) + reader = ExifToolReadVideo(etree) + track = reader._extract_gps_track_from_quicktime(namespace="Insta360") + assert track == [] + + +# --------------------------------------------------------------------------- +# ExifToolReadVideo._extract_gps_track_from_track +# --------------------------------------------------------------------------- + + +class TestExtractGpsTrackFromTrack: + def test_gopro_track_data(self): + etree = _etree_from_xml(GOPRO_XML) + reader = ExifToolReadVideo(etree) + track = reader._extract_gps_track_from_track() + assert len(track) == 2 + assert track[0].lat == pytest.approx(47.359832) + assert track[0].lon == pytest.approx(8.522706) + assert track[0].alt == pytest.approx(414.9) + # GPSMeasureMode=3 -> FIX_3D + assert track[0].fix == GPSFix.FIX_3D + # GPSHPositioningError=2.19 -> 219.0 after *100 + assert track[0].precision == pytest.approx(219.0) + + def test_no_track_data_returns_empty(self): + etree = _etree_from_xml(BLACKVUE_XML) + reader = ExifToolReadVideo(etree) + # BlackVue doesn't have Track namespace data + track = reader._extract_gps_track_from_track() + assert track == [] + + def test_track2_namespace(self): + """GPS data in Track2 (not Track1) is still found.""" + xml = """\ + + + + 0 + 1.0 + 47.0 + 8.0 + + +""" + reader = ExifToolReadVideo(_etree_from_xml(xml)) + track = reader._extract_gps_track_from_track() + assert len(track) == 1 + assert track[0].lat == pytest.approx(47.0) + + def test_track_with_incomplete_tags_skipped(self): + """Track namespace missing SampleTime/SampleDuration is skipped.""" + xml = """\ + + + + 47.0 + 8.0 + + +""" + reader = ExifToolReadVideo(_etree_from_xml(xml)) + track = reader._extract_gps_track_from_track() + assert track == [] + + +# --------------------------------------------------------------------------- +# Edge cases +# --------------------------------------------------------------------------- + + +class TestEdgeCases: + def test_single_gps_point(self): + """Single GPS point still works.""" + xml = """\ + + + + 2019:09:02 10:23:28.00Z + 37.0 + 28.0 + + +""" + reader = ExifToolReadVideo(_etree_from_xml(xml)) + track = reader.extract_gps_track() + assert len(track) == 1 + assert track[0].time == pytest.approx(0.0) + + def test_duplicate_points_removed(self): + """Consecutive identical GPS points are deduplicated.""" + xml = """\ + + + + 2019:09:02 10:23:28.00Z + 37.0 + 28.0 + 2019:09:02 10:23:28.00Z + 37.0 + 28.0 + + +""" + reader = ExifToolReadVideo(_etree_from_xml(xml)) + track = reader.extract_gps_track() + assert len(track) == 1 + + def test_rdf_description_with_no_children(self): + xml = """\ + + + + + +""" + reader = ExifToolReadVideo(_etree_from_xml(xml)) + assert reader.extract_gps_track() == [] + assert reader.extract_make() is None + assert reader.extract_model() is None + assert reader.extract_camera_uuid() is None diff --git a/tests/unit/test_gpmf_parser.py b/tests/unit/test_gpmf_parser.py index ca372758..bae30ce1 100644 --- a/tests/unit/test_gpmf_parser.py +++ b/tests/unit/test_gpmf_parser.py @@ -3,11 +3,1152 @@ # This source code is licensed under the BSD license found in the # LICENSE file in the root directory of this source tree. +import datetime +import os +from pathlib import Path + +import pytest + +from mapillary_tools import telemetry from mapillary_tools.gpmf import gpmf_parser -def test_simple(): - x = gpmf_parser.KLV.parse(b"DEMO\x02\x01\x00\x01\xff\x00\x00\x00") - x = gpmf_parser.GPMFSampleData.parse( - b"DEM1\x01\x01\x00\x01\xff\x00\x00\x00DEM2\x03\x00\x00\x01" - ) +# --------------------------------------------------------------------------- +# Test data file paths (used for integration-style tests) +# --------------------------------------------------------------------------- +GPS5_VIDEO = Path( + "/tmp/mly_coverage_test/data/mly_test_data/GoPro MAX (works)/video 24 FPS/GS011498.360" +) +GPS9_VIDEO = Path( + "/tmp/mly_coverage_test/data/mly_test_data/GoPro MAX 2 (works)/360 Video (including lower res)/GS015637.360" +) +HERO7_VIDEO = Path( + "/tmp/mly_coverage_test/data/mly_test_data/GoPro Hero 7 (works)/GH010359.MP4" +) + +_has_gps5_video = GPS5_VIDEO.exists() +_has_gps9_video = GPS9_VIDEO.exists() +_has_hero7_video = HERO7_VIDEO.exists() + + +# --------------------------------------------------------------------------- +# Helpers to construct realistic KLVDict structures +# --------------------------------------------------------------------------- +def _make_klv( + key: bytes, type_char: bytes, data, structure_size: int = 0, repeat: int = 0 +): + """Build a minimal KLVDict-like dict with the fields the parser uses.""" + return { + "key": key, + "type": type_char, + "structure_size": structure_size, + "repeat": repeat, + "data": data, + } + + +# --------------------------------------------------------------------------- +# 1. _gps5_timestamp_to_epoch_time +# --------------------------------------------------------------------------- +class TestGps5TimestampToEpochTime: + def test_known_timestamp(self): + # GPSU from the GoPro MAX file: '230117115504.225' means 2023-01-17 11:55:04.225 UTC + epoch = gpmf_parser._gps5_timestamp_to_epoch_time("230117115504.225") + dt = datetime.datetime( + 2023, 1, 17, 11, 55, 4, 225000, tzinfo=datetime.timezone.utc + ) + assert epoch == pytest.approx(dt.timestamp(), abs=0.001) + + def test_midnight_timestamp(self): + epoch = gpmf_parser._gps5_timestamp_to_epoch_time("230101000000.000") + dt = datetime.datetime(2023, 1, 1, 0, 0, 0, 0, tzinfo=datetime.timezone.utc) + assert epoch == pytest.approx(dt.timestamp(), abs=0.001) + + def test_end_of_year(self): + epoch = gpmf_parser._gps5_timestamp_to_epoch_time("231231235959.999") + dt = datetime.datetime( + 2023, 12, 31, 23, 59, 59, 999000, tzinfo=datetime.timezone.utc + ) + assert epoch == pytest.approx(dt.timestamp(), abs=0.001) + + +# --------------------------------------------------------------------------- +# 2. _gps9_timestamp_to_epoch_time +# --------------------------------------------------------------------------- +class TestGps9TimestampToEpochTime: + def test_known_values_from_real_data(self): + # days_since_2000=9463, secs_since_midnight=44896.0 -> 2025-11-28 12:28:16 UTC + epoch = gpmf_parser._gps9_timestamp_to_epoch_time(9463, 44896.0) + assert epoch == pytest.approx(1764332896.0, abs=1.0) + + def test_epoch_at_2000(self): + # 0 days, 0 secs -> 2000-01-01T00:00:00 UTC + epoch = gpmf_parser._gps9_timestamp_to_epoch_time(0, 0.0) + dt_2000 = datetime.datetime(2000, 1, 1, tzinfo=datetime.timezone.utc) + assert epoch == pytest.approx(dt_2000.timestamp(), abs=0.001) + + def test_one_day_after_2000(self): + epoch = gpmf_parser._gps9_timestamp_to_epoch_time(1, 0.0) + dt = datetime.datetime(2000, 1, 2, tzinfo=datetime.timezone.utc) + assert epoch == pytest.approx(dt.timestamp(), abs=0.001) + + def test_fractional_seconds(self): + # 0 days, 3600.5 secs -> 2000-01-01T01:00:00.5 UTC + epoch = gpmf_parser._gps9_timestamp_to_epoch_time(0, 3600.5) + dt = datetime.datetime( + 2000, 1, 1, 1, 0, 0, 500000, tzinfo=datetime.timezone.utc + ) + assert epoch == pytest.approx(dt.timestamp(), abs=0.001) + + +# --------------------------------------------------------------------------- +# 3. _get_gps_type +# --------------------------------------------------------------------------- +class TestGetGpsType: + def test_flat_bytes_list(self): + # TYPE data from real GPS9 stream: [b'lllllllSS'] + result = gpmf_parser._get_gps_type([b"lllllllSS"]) + assert result == b"lllllllSS" + + def test_nested_bytes(self): + # e.g., [b'll', [b'SS', b'bb']] + result = gpmf_parser._get_gps_type([b"ll", [b"SS", b"bb"]]) + assert result == b"llSSbb" + + def test_empty_input(self): + result = gpmf_parser._get_gps_type([]) + assert result == b"" + + def test_none_input(self): + result = gpmf_parser._get_gps_type(None) + assert result == b"" + + def test_unexpected_type_raises(self): + with pytest.raises(ValueError, match="Unexpected type"): + gpmf_parser._get_gps_type([123]) + + def test_deeply_nested(self): + result = gpmf_parser._get_gps_type([[b"a", [b"b"]]]) + assert result == b"ab" + + +# --------------------------------------------------------------------------- +# 4. _gps5_from_stream - unit tests using constructed KLVDict data +# --------------------------------------------------------------------------- +class TestGps5FromStream: + def _make_gps5_stream( + self, + gps5_data=None, + scal_data=None, + gpsf_data=None, + gpsu_data=None, + gpsp_data=None, + ): + """Build a GPS5 STRM stream with realistic structure.""" + stream = [] + if gpsf_data is not None: + stream.append(_make_klv(b"GPSF", b"L", gpsf_data)) + if gpsu_data is not None: + stream.append(_make_klv(b"GPSU", b"U", gpsu_data)) + if gpsp_data is not None: + stream.append(_make_klv(b"GPSP", b"S", gpsp_data)) + if scal_data is not None: + stream.append(_make_klv(b"SCAL", b"l", scal_data)) + if gps5_data is not None: + stream.append(_make_klv(b"GPS5", b"l", gps5_data)) + return stream + + def test_basic_gps5_parsing(self): + """Parse two GPS5 points from the GoPro MAX real data.""" + # Real values from the GoPro MAX file + stream = self._make_gps5_stream( + gps5_data=[ + [473598318, 85227055, 414870, 891, 119], + [473598322, 85227045, 414950, 1064, 95], + ], + scal_data=[[10000000], [10000000], [1000], [1000], [100]], + gpsf_data=[[3]], + gpsu_data=[[b"230117115504.225"]], + gpsp_data=[[219]], + ) + points = list(gpmf_parser._gps5_from_stream(stream)) + assert len(points) == 2 + + p0 = points[0] + assert p0.lat == pytest.approx(47.3598318, abs=1e-7) + assert p0.lon == pytest.approx(8.5227055, abs=1e-7) + assert p0.alt == pytest.approx(414.87, abs=0.01) + assert p0.ground_speed == pytest.approx(0.891, abs=0.001) + assert p0.fix == telemetry.GPSFix.FIX_3D + assert p0.precision == 219 + assert p0.epoch_time is not None + assert p0.time == 0 # time is always 0 from _gps5_from_stream + assert p0.angle is None + + p1 = points[1] + assert p1.lat == pytest.approx(47.3598322, abs=1e-7) + assert p1.lon == pytest.approx(8.5227045, abs=1e-7) + + def test_no_gps5_key(self): + """Stream without GPS5 key yields nothing.""" + stream = self._make_gps5_stream( + scal_data=[[10000000], [10000000], [1000], [1000], [100]], + ) + points = list(gpmf_parser._gps5_from_stream(stream)) + assert points == [] + + def test_no_scal_key(self): + """Stream without SCAL key yields nothing.""" + stream = self._make_gps5_stream( + gps5_data=[[473598318, 85227055, 414870, 891, 119]], + ) + points = list(gpmf_parser._gps5_from_stream(stream)) + assert points == [] + + def test_zero_scale_value(self): + """Zero in SCAL causes early return (avoids division by zero).""" + stream = self._make_gps5_stream( + gps5_data=[[473598318, 85227055, 414870, 891, 119]], + scal_data=[[10000000], [0], [1000], [1000], [100]], + ) + points = list(gpmf_parser._gps5_from_stream(stream)) + assert points == [] + + def test_no_gpsf(self): + """Missing GPSF -> fix is None.""" + stream = self._make_gps5_stream( + gps5_data=[[473598318, 85227055, 414870, 891, 119]], + scal_data=[[10000000], [10000000], [1000], [1000], [100]], + ) + points = list(gpmf_parser._gps5_from_stream(stream)) + assert len(points) == 1 + assert points[0].fix is None + + def test_gpsf_no_lock(self): + """GPSF=0 means no lock.""" + stream = self._make_gps5_stream( + gps5_data=[[473598318, 85227055, 414870, 891, 119]], + scal_data=[[10000000], [10000000], [1000], [1000], [100]], + gpsf_data=[[0]], + ) + points = list(gpmf_parser._gps5_from_stream(stream)) + assert len(points) == 1 + assert points[0].fix == telemetry.GPSFix.NO_FIX + + def test_gpsf_2d_lock(self): + """GPSF=2 means 2D lock.""" + stream = self._make_gps5_stream( + gps5_data=[[473598318, 85227055, 414870, 891, 119]], + scal_data=[[10000000], [10000000], [1000], [1000], [100]], + gpsf_data=[[2]], + ) + points = list(gpmf_parser._gps5_from_stream(stream)) + assert points[0].fix == telemetry.GPSFix.FIX_2D + + def test_no_gpsu(self): + """Missing GPSU -> epoch_time is None.""" + stream = self._make_gps5_stream( + gps5_data=[[473598318, 85227055, 414870, 891, 119]], + scal_data=[[10000000], [10000000], [1000], [1000], [100]], + ) + points = list(gpmf_parser._gps5_from_stream(stream)) + assert len(points) == 1 + assert points[0].epoch_time is None + + def test_invalid_gpsu(self): + """Invalid GPSU string -> epoch_time is None (exception caught).""" + stream = self._make_gps5_stream( + gps5_data=[[473598318, 85227055, 414870, 891, 119]], + scal_data=[[10000000], [10000000], [1000], [1000], [100]], + gpsu_data=[[b"invalid_gpsu_str"]], + ) + points = list(gpmf_parser._gps5_from_stream(stream)) + assert len(points) == 1 + assert points[0].epoch_time is None + + def test_no_gpsp(self): + """Missing GPSP -> precision is None.""" + stream = self._make_gps5_stream( + gps5_data=[[473598318, 85227055, 414870, 891, 119]], + scal_data=[[10000000], [10000000], [1000], [1000], [100]], + ) + points = list(gpmf_parser._gps5_from_stream(stream)) + assert len(points) == 1 + assert points[0].precision is None + + def test_epoch_time_same_for_all_points_in_sample(self): + """All points in the same GPS5 sample share the same epoch_time.""" + stream = self._make_gps5_stream( + gps5_data=[ + [473598318, 85227055, 414870, 891, 119], + [473598322, 85227045, 414950, 1064, 95], + [473598323, 85227036, 415033, 1139, 111], + ], + scal_data=[[10000000], [10000000], [1000], [1000], [100]], + gpsu_data=[[b"230117115504.225"]], + ) + points = list(gpmf_parser._gps5_from_stream(stream)) + assert len(points) == 3 + # All should have the same epoch_time + assert points[0].epoch_time == points[1].epoch_time == points[2].epoch_time + + def test_negative_coordinates(self): + """Negative GPS coordinates (southern/western hemisphere).""" + # Hero 7 data: lat=34.3985866, lon=-119.6986334 (negative raw lon) + stream = self._make_gps5_stream( + gps5_data=[[343985866, -1196986334, -34141, 313, 50]], + scal_data=[[10000000], [10000000], [1000], [1000], [100]], + gpsf_data=[[3]], + ) + points = list(gpmf_parser._gps5_from_stream(stream)) + assert len(points) == 1 + assert points[0].lat == pytest.approx(34.3985866, abs=1e-7) + assert points[0].lon == pytest.approx(-119.6986334, abs=1e-7) + assert points[0].alt == pytest.approx(-34.141, abs=0.001) + + +# --------------------------------------------------------------------------- +# 5. _gps9_from_stream - unit tests +# --------------------------------------------------------------------------- +class TestGps9FromStream: + def _make_gps9_stream( + self, + gps9_data=None, + scal_data=None, + type_data=None, + ): + """Build a GPS9 STRM stream with realistic structure.""" + stream = [] + if type_data is not None: + stream.append(_make_klv(b"TYPE", b"c", type_data)) + if scal_data is not None: + stream.append(_make_klv(b"SCAL", b"l", scal_data)) + if gps9_data is not None: + stream.append(_make_klv(b"GPS9", b"?", gps9_data)) + return stream + + def _build_gps9_sample_bytes( + self, lat, lon, alt, speed2d, speed3d, days, secs_ms, dop, fix + ): + """Encode raw GPS9 values as bytes using the 'lllllllSS' format.""" + import struct + + return struct.pack( + ">iiiiiiiHH", + lat, + lon, + alt, + speed2d, + speed3d, + days, + secs_ms, + dop, + fix, + ) + + def test_basic_gps9_parsing(self): + """Parse a GPS9 point from GoPro MAX2 real data.""" + # Real bytes from the file + sample_bytes = bytes.fromhex( + "1e71d2c703b6242500014dce000001270000002a000024f702ad0f0000b90003" + ) + + stream = self._make_gps9_stream( + gps9_data=[sample_bytes], + scal_data=[ + [10000000], + [10000000], + [1000], + [1000], + [100], + [1], + [1000], + [100], + [1], + ], + type_data=[b"lllllllSS"], + ) + points = list(gpmf_parser._gps9_from_stream(stream)) + assert len(points) == 1 + + p = points[0] + assert p.lat == pytest.approx(51.0776007, abs=1e-6) + assert p.lon == pytest.approx(6.2268453, abs=1e-6) + assert p.alt == pytest.approx(85.454, abs=0.01) + assert p.ground_speed == pytest.approx(0.295, abs=0.001) + assert p.fix == telemetry.GPSFix.FIX_3D + assert p.precision == pytest.approx(185.0, abs=0.1) + assert p.epoch_time == pytest.approx(1764332896.0, abs=1.0) + assert p.time == 0 + assert p.angle is None + + def test_no_gps9_key(self): + stream = self._make_gps9_stream( + scal_data=[ + [10000000], + [10000000], + [1000], + [1000], + [100], + [1], + [1000], + [100], + [1], + ], + type_data=[b"lllllllSS"], + ) + points = list(gpmf_parser._gps9_from_stream(stream)) + assert points == [] + + def test_no_scal_key(self): + sample_bytes = self._build_gps9_sample_bytes( + 510776007, 62268453, 85454, 295, 42, 9463, 44896000, 185, 3 + ) + stream = self._make_gps9_stream( + gps9_data=[sample_bytes], + type_data=[b"lllllllSS"], + ) + points = list(gpmf_parser._gps9_from_stream(stream)) + assert points == [] + + def test_no_type_key(self): + sample_bytes = self._build_gps9_sample_bytes( + 510776007, 62268453, 85454, 295, 42, 9463, 44896000, 185, 3 + ) + stream = self._make_gps9_stream( + gps9_data=[sample_bytes], + scal_data=[ + [10000000], + [10000000], + [1000], + [1000], + [100], + [1], + [1000], + [100], + [1], + ], + ) + points = list(gpmf_parser._gps9_from_stream(stream)) + assert points == [] + + def test_zero_scale_value(self): + sample_bytes = self._build_gps9_sample_bytes( + 510776007, 62268453, 85454, 295, 42, 9463, 44896000, 185, 3 + ) + stream = self._make_gps9_stream( + gps9_data=[sample_bytes], + scal_data=[[10000000], [0], [1000], [1000], [100], [1], [1000], [100], [1]], + type_data=[b"lllllllSS"], + ) + points = list(gpmf_parser._gps9_from_stream(stream)) + assert points == [] + + def test_wrong_type_length_raises(self): + sample_bytes = self._build_gps9_sample_bytes( + 510776007, 62268453, 85454, 295, 42, 9463, 44896000, 185, 3 + ) + stream = self._make_gps9_stream( + gps9_data=[sample_bytes], + scal_data=[ + [10000000], + [10000000], + [1000], + [1000], + [100], + [1], + [1000], + [100], + [1], + ], + type_data=[b"llll"], # only 4 types instead of 9 + ) + with pytest.raises(ValueError, match="expect 9 types"): + list(gpmf_parser._gps9_from_stream(stream)) + + def test_multiple_gps9_samples(self): + """Multiple GPS9 samples in one stream.""" + s1 = self._build_gps9_sample_bytes( + 510776007, 62268453, 85454, 295, 42, 9463, 44896000, 185, 3 + ) + s2 = self._build_gps9_sample_bytes( + 510776005, 62268463, 85505, 300, 45, 9463, 44897000, 190, 3 + ) + stream = self._make_gps9_stream( + gps9_data=[s1, s2], + scal_data=[ + [10000000], + [10000000], + [1000], + [1000], + [100], + [1], + [1000], + [100], + [1], + ], + type_data=[b"lllllllSS"], + ) + points = list(gpmf_parser._gps9_from_stream(stream)) + assert len(points) == 2 + # Each point should have its own epoch_time + assert points[0].epoch_time != points[1].epoch_time + assert points[1].epoch_time == pytest.approx( + gpmf_parser._gps9_timestamp_to_epoch_time(9463, 44897.0), abs=0.01 + ) + + +# --------------------------------------------------------------------------- +# 6. _find_first_device_id +# --------------------------------------------------------------------------- +class TestFindFirstDeviceId: + def test_dvid_present(self): + stream = [ + _make_klv(b"DVID", b"L", [[1]]), + _make_klv(b"DVNM", b"c", [b"GoPro Max"]), + ] + assert gpmf_parser._find_first_device_id(stream) == 1 + + def test_dvid_large_value(self): + stream = [ + _make_klv(b"DVID", b"L", [[4294967295]]), + ] + assert gpmf_parser._find_first_device_id(stream) == 4294967295 + + def test_no_dvid_returns_default(self): + stream = [ + _make_klv(b"DVNM", b"c", [b"GoPro Max"]), + ] + device_id = gpmf_parser._find_first_device_id(stream) + assert device_id == 2**32 + + def test_empty_stream(self): + device_id = gpmf_parser._find_first_device_id([]) + assert device_id == 2**32 + + def test_dvid_first_wins(self): + """If multiple DVID entries exist, the first one is used.""" + stream = [ + _make_klv(b"DVID", b"L", [[5]]), + _make_klv(b"DVID", b"L", [[10]]), + ] + assert gpmf_parser._find_first_device_id(stream) == 5 + + +# --------------------------------------------------------------------------- +# 7. _find_first_gps_stream - prefers GPS9 over GPS5 +# --------------------------------------------------------------------------- +class TestFindFirstGpsStream: + def test_gps5_stream_found(self): + gps5_strm = _make_klv( + b"STRM", + b"\x00", + [ + _make_klv( + b"SCAL", b"l", [[10000000], [10000000], [1000], [1000], [100]] + ), + _make_klv(b"GPS5", b"l", [[473598318, 85227055, 414870, 891, 119]]), + _make_klv(b"GPSF", b"L", [[3]]), + ], + ) + points = gpmf_parser._find_first_gps_stream([gps5_strm]) + assert len(points) == 1 + assert points[0].lat == pytest.approx(47.3598318, abs=1e-7) + + def test_empty_stream(self): + points = gpmf_parser._find_first_gps_stream([]) + assert points == [] + + def test_no_strm_key(self): + non_strm = _make_klv(b"DVNM", b"c", [b"GoPro Max"]) + points = gpmf_parser._find_first_gps_stream([non_strm]) + assert points == [] + + def test_gps9_preferred_over_gps5(self): + """GPS9 is tried first within each STRM; GPS5 is fallback.""" + import struct + + sample_bytes = struct.pack( + ">iiiiiiiHH", + 510776007, + 62268453, + 85454, + 295, + 42, + 9463, + 44896000, + 185, + 3, + ) + gps9_strm = _make_klv( + b"STRM", + b"\x00", + [ + _make_klv(b"TYPE", b"c", [b"lllllllSS"]), + _make_klv( + b"SCAL", + b"l", + [ + [10000000], + [10000000], + [1000], + [1000], + [100], + [1], + [1000], + [100], + [1], + ], + ), + _make_klv(b"GPS9", b"?", [sample_bytes]), + ], + ) + points = gpmf_parser._find_first_gps_stream([gps9_strm]) + assert len(points) == 1 + assert points[0].lat == pytest.approx(51.0776007, abs=1e-6) + + def test_gps5_fallback_when_gps9_missing(self): + """Falls back to GPS5 if GPS9 is not found in stream.""" + gps5_strm = _make_klv( + b"STRM", + b"\x00", + [ + _make_klv( + b"SCAL", b"l", [[10000000], [10000000], [1000], [1000], [100]] + ), + _make_klv(b"GPS5", b"l", [[343985866, -1196986334, -34141, 313, 50]]), + _make_klv(b"GPSF", b"L", [[3]]), + ], + ) + points = gpmf_parser._find_first_gps_stream([gps5_strm]) + assert len(points) == 1 + assert points[0].lat == pytest.approx(34.3985866, abs=1e-7) + assert points[0].lon == pytest.approx(-119.6986334, abs=1e-7) + + +# --------------------------------------------------------------------------- +# 8. _extract_camera_model_from_devices +# --------------------------------------------------------------------------- +class TestExtractCameraModelFromDevices: + def test_empty_device_names(self): + assert gpmf_parser._extract_camera_model_from_devices({}) == "" + + def test_single_device(self): + result = gpmf_parser._extract_camera_model_from_devices({1: b"GoPro Max"}) + assert result == "GoPro Max" + + def test_hero_priority(self): + """Device with 'hero' in the name gets higher priority.""" + result = gpmf_parser._extract_camera_model_from_devices( + { + 1: b"GoPro Max", + 2: b"Hero7 Black", + } + ) + assert result == "Hero7 Black" + + def test_gopro_priority_over_other(self): + """Device with 'gopro' gets priority if no 'hero' device.""" + result = gpmf_parser._extract_camera_model_from_devices( + { + 1: b"SomeOtherCam", + 2: b"GoPro Fusion", + } + ) + assert result == "GoPro Fusion" + + def test_first_alphabetically_if_no_hero_or_gopro(self): + result = gpmf_parser._extract_camera_model_from_devices( + { + 1: b"ZCam", + 2: b"ACam", + } + ) + assert result == "ACam" + + def test_whitespace_stripped(self): + result = gpmf_parser._extract_camera_model_from_devices( + { + 1: b" GoPro Max ", + } + ) + assert result == "GoPro Max" + + def test_unicode_decode_error_skipped(self): + """Devices with invalid UTF-8 names are skipped.""" + result = gpmf_parser._extract_camera_model_from_devices( + { + 1: b"\xff\xfe", + 2: b"GoPro Max", + } + ) + assert result == "GoPro Max" + + def test_all_invalid_unicode(self): + """All devices with invalid UTF-8 returns empty string.""" + result = gpmf_parser._extract_camera_model_from_devices( + { + 1: b"\xff\xfe", + 2: b"\x80\x81", + } + ) + assert result == "" + + def test_hero_case_insensitive(self): + result = gpmf_parser._extract_camera_model_from_devices( + { + 1: b"MAX2", + 2: b"HERO12 Black", + } + ) + assert result == "HERO12 Black" + + def test_gopro_case_insensitive(self): + result = gpmf_parser._extract_camera_model_from_devices( + { + 1: b"SomeCam", + 2: b"GOPRO MAX", + } + ) + assert result == "GOPRO MAX" + + def test_real_max_model(self): + """GoPro Max has 'GoPro Max' device name (contains 'gopro', not 'hero').""" + result = gpmf_parser._extract_camera_model_from_devices({1: b"GoPro Max"}) + assert result == "GoPro Max" + + def test_real_max2_model(self): + """MAX2 has no 'hero' or 'gopro' in the name.""" + result = gpmf_parser._extract_camera_model_from_devices({1: b"MAX2"}) + assert result == "MAX2" + + +# --------------------------------------------------------------------------- +# 9. _backfill_gps_timestamps +# --------------------------------------------------------------------------- +class TestBackfillGpsTimestamps: + def _make_point(self, time, epoch_time=None): + return telemetry.GPSPoint( + time=time, + lat=47.36, + lon=8.52, + alt=400.0, + epoch_time=epoch_time, + fix=telemetry.GPSFix.FIX_3D, + precision=200, + ground_speed=1.0, + angle=None, + ) + + def test_all_have_epoch_time(self): + """No backfilling needed when all points have epoch_time.""" + pts = [ + self._make_point(0.0, 1000.0), + self._make_point(1.0, 1001.0), + self._make_point(2.0, 1002.0), + ] + gpmf_parser._backfill_gps_timestamps(pts) + assert pts[0].epoch_time == 1000.0 + assert pts[1].epoch_time == 1001.0 + assert pts[2].epoch_time == 1002.0 + + def test_backfill_forward(self): + """Points after the first with epoch_time get backfilled.""" + pts = [ + self._make_point(0.0, 1000.0), + self._make_point(1.0, None), + self._make_point(2.0, None), + ] + gpmf_parser._backfill_gps_timestamps(pts) + assert pts[0].epoch_time == 1000.0 + assert pts[1].epoch_time == pytest.approx(1001.0) + assert pts[2].epoch_time == pytest.approx(1002.0) + + def test_backfill_backward_with_reversed(self): + """Backfill backward by calling with reversed().""" + pts = [ + self._make_point(0.0, None), + self._make_point(1.0, None), + self._make_point(2.0, 1002.0), + ] + gpmf_parser._backfill_gps_timestamps(reversed(pts)) + assert pts[0].epoch_time == pytest.approx(1000.0) + assert pts[1].epoch_time == pytest.approx(1001.0) + assert pts[2].epoch_time == 1002.0 + + def test_no_points_with_epoch_time(self): + """No crash when no points have epoch_time.""" + pts = [ + self._make_point(0.0, None), + self._make_point(1.0, None), + ] + gpmf_parser._backfill_gps_timestamps(pts) + assert pts[0].epoch_time is None + assert pts[1].epoch_time is None + + def test_empty_list(self): + """No crash on empty list.""" + gpmf_parser._backfill_gps_timestamps([]) + + def test_single_point_with_epoch(self): + pts = [self._make_point(0.0, 1000.0)] + gpmf_parser._backfill_gps_timestamps(pts) + assert pts[0].epoch_time == 1000.0 + + def test_single_point_without_epoch(self): + pts = [self._make_point(0.0, None)] + gpmf_parser._backfill_gps_timestamps(pts) + assert pts[0].epoch_time is None + + def test_middle_point_has_epoch(self): + """Only points after the first with epoch_time get filled (forward pass).""" + pts = [ + self._make_point(0.0, None), + self._make_point(1.0, 1001.0), + self._make_point(2.0, None), + ] + gpmf_parser._backfill_gps_timestamps(pts) + # Forward pass: only point after 1001 gets filled + assert pts[0].epoch_time is None + assert pts[1].epoch_time == 1001.0 + assert pts[2].epoch_time == pytest.approx(1002.0) + + def test_preserves_existing_epoch_times(self): + """Points that already have epoch_time are not overwritten.""" + pts = [ + self._make_point(0.0, 1000.0), + self._make_point(1.0, None), + self._make_point(2.0, 1005.0), # intentionally different + ] + gpmf_parser._backfill_gps_timestamps(pts) + assert pts[2].epoch_time == 1005.0 # preserved, not overwritten + + +# --------------------------------------------------------------------------- +# 10. _build_matrix, _apply_matrix, _is_matrix_calibration +# --------------------------------------------------------------------------- +class TestMatrixOperations: + def test_is_matrix_calibration_identity_like(self): + """A matrix with only 0, 1, -1 values is NOT calibration.""" + assert gpmf_parser._is_matrix_calibration([1, 0, 0, 0, -1, 0, 0, 0, 1]) is False + + def test_is_matrix_calibration_actual(self): + """A matrix with non-trivial values IS calibration.""" + assert ( + gpmf_parser._is_matrix_calibration([1.5, 0, 0, 0, -1, 0, 0, 0, 1]) is True + ) + + def test_is_matrix_calibration_all_zeros(self): + assert gpmf_parser._is_matrix_calibration([0, 0, 0, 0, 0, 0, 0, 0, 0]) is False + + def test_build_matrix_identity(self): + """ORIN='XYZ' ORIO='XYZ' should produce identity.""" + matrix = gpmf_parser._build_matrix(b"XYZ", b"XYZ") + assert matrix == [1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0] + + def test_build_matrix_swap_axes(self): + """ORIN='YxZ' ORIO='ZXY' swaps and negates axes.""" + matrix = gpmf_parser._build_matrix(b"YxZ", b"ZXY") + # Y -> ZXY: Y matches at index 2 -> [0, 0, 1] + # x (lowercase) -> negate X -> [0, -1, 0] + # Z -> ZXY: Z matches at index 0 -> [1, 0, 0] + assert matrix == [0.0, 0.0, 1.0, 0.0, -1.0, 0.0, 1.0, 0.0, 0.0] + + def test_build_matrix_negate_all(self): + """ORIN='xyz' ORIO='XYZ' should produce -1 on diagonal.""" + matrix = gpmf_parser._build_matrix(b"xyz", b"XYZ") + assert matrix == [-1.0, 0.0, 0.0, 0.0, -1.0, 0.0, 0.0, 0.0, -1.0] + + def test_apply_matrix_identity(self): + identity = [1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0] + values = [10.0, 20.0, 30.0] + result = list(gpmf_parser._apply_matrix(identity, values)) + assert result == [10.0, 20.0, 30.0] + + def test_apply_matrix_negate_y(self): + matrix = [1.0, 0.0, 0.0, 0.0, -1.0, 0.0, 0.0, 0.0, 1.0] + values = [10.0, 20.0, 30.0] + result = list(gpmf_parser._apply_matrix(matrix, values)) + assert result == [10.0, -20.0, 30.0] + + def test_apply_matrix_swap(self): + matrix = [0.0, 0.0, 1.0, 0.0, -1.0, 0.0, 1.0, 0.0, 0.0] + values = [10.0, 20.0, 30.0] + result = list(gpmf_parser._apply_matrix(matrix, values)) + assert result == [30.0, -20.0, 10.0] + + def test_apply_matrix_wrong_size_asserts(self): + matrix = [1.0, 0.0, 0.0, 0.0, 1.0, 0.0] # 6 elements, not 9 + with pytest.raises(AssertionError, match="square matrix"): + list(gpmf_parser._apply_matrix(matrix, [10.0, 20.0, 30.0])) + + def test_apply_matrix_2d(self): + """2x2 matrix multiplication.""" + matrix = [0.0, 1.0, 1.0, 0.0] # swap + values = [3.0, 7.0] + result = list(gpmf_parser._apply_matrix(matrix, values)) + assert result == [7.0, 3.0] + + +# --------------------------------------------------------------------------- +# 11. _scale_and_calibrate +# --------------------------------------------------------------------------- +class TestScaleAndCalibrate: + def test_basic_scaling(self): + stream = [ + _make_klv(b"SCAL", b"s", [[100], [200], [300]]), + _make_klv(b"ACCL", b"s", [[1000, 2000, 3000], [500, 600, 900]]), + ] + results = list(gpmf_parser._scale_and_calibrate(stream, b"ACCL")) + assert len(results) == 2 + assert results[0] == pytest.approx((10.0, 10.0, 10.0)) + assert results[1] == pytest.approx((5.0, 3.0, 3.0)) + + def test_single_scale_repeated(self): + """Single SCAL value is repeated for all elements.""" + stream = [ + _make_klv(b"SCAL", b"s", [[100]]), + _make_klv(b"GYRO", b"s", [[200, 400, 600]]), + ] + results = list(gpmf_parser._scale_and_calibrate(stream, b"GYRO")) + assert len(results) == 1 + assert results[0] == pytest.approx((2.0, 4.0, 6.0)) + + def test_missing_key_returns_empty(self): + stream = [ + _make_klv(b"SCAL", b"s", [[100]]), + _make_klv(b"ACCL", b"s", [[200, 300, 400]]), + ] + results = list(gpmf_parser._scale_and_calibrate(stream, b"GYRO")) + assert results == [] + + def test_with_orin_orio_matrix(self): + """Matrix from ORIN/ORIO applied to scaled values.""" + stream = [ + _make_klv(b"SCAL", b"s", [[100], [100], [100]]), + _make_klv(b"ACCL", b"s", [[1000, 2000, 3000]]), + _make_klv(b"ORIN", b"c", [b"Y", b"x", b"Z"]), + _make_klv(b"ORIO", b"c", [b"Z", b"X", b"Y"]), + ] + results = list(gpmf_parser._scale_and_calibrate(stream, b"ACCL")) + assert len(results) == 1 + # ORIN=YxZ, ORIO=ZXY -> matrix=[0,0,1, 0,-1,0, 1,0,0] + # scaled = [10, 20, 30] + # matrix * scaled = [30, -20, 10] + assert results[0] == pytest.approx((30.0, -20.0, 10.0)) + + def test_zero_scal_replaced_with_one(self): + """Zero SCAL values should be replaced with 1 to avoid division by zero.""" + stream = [ + _make_klv(b"SCAL", b"s", [[0], [100], [0]]), + _make_klv(b"ACCL", b"s", [[500, 1000, 300]]), + ] + results = list(gpmf_parser._scale_and_calibrate(stream, b"ACCL")) + assert len(results) == 1 + # 0 replaced with 1, so: 500/1=500, 1000/100=10, 300/1=300 + assert results[0] == pytest.approx((500.0, 10.0, 300.0)) + + +# --------------------------------------------------------------------------- +# 12. KLV parsing basics (existing test expanded) +# --------------------------------------------------------------------------- +class TestKLVParsing: + def test_simple_klv(self): + x = gpmf_parser.KLV.parse(b"DEMO\x02\x01\x00\x01\xff\x00\x00\x00") + assert x["key"] == b"DEMO" + + def test_gpmf_sample_data(self): + x = gpmf_parser.GPMFSampleData.parse( + b"DEM1\x01\x01\x00\x01\xff\x00\x00\x00DEM2\x03\x00\x00\x01" + ) + assert len(x) == 2 + assert x[0]["key"] == b"DEM1" + assert x[1]["key"] == b"DEM2" + + +# --------------------------------------------------------------------------- +# 13. GoProInfo dataclass defaults +# --------------------------------------------------------------------------- +class TestGoProInfo: + def test_defaults(self): + info = gpmf_parser.GoProInfo() + assert info.gps is None + assert info.accl is None + assert info.gyro is None + assert info.magn is None + assert info.make == "GoPro" + assert info.model == "" + + +# ============================================================================ +# INTEGRATION TESTS (require real test data files) +# ============================================================================ + + +# --------------------------------------------------------------------------- +# 14. extract_gopro_info with GPS5 file (GoPro MAX) +# --------------------------------------------------------------------------- +@pytest.mark.skipif(not _has_gps5_video, reason="GPS5 test data not available") +class TestExtractGoProInfoGPS5: + def test_basic_extraction(self): + with open(GPS5_VIDEO, "rb") as f: + info = gpmf_parser.extract_gopro_info(f) + assert info is not None + assert info.make == "GoPro" + assert info.model == "GoPro Max" + + def test_gps_count(self): + with open(GPS5_VIDEO, "rb") as f: + info = gpmf_parser.extract_gopro_info(f) + assert info.gps is not None + assert len(info.gps) == 1737 + + def test_first_gps_point(self): + with open(GPS5_VIDEO, "rb") as f: + info = gpmf_parser.extract_gopro_info(f) + p = info.gps[0] + assert p.lat == pytest.approx(47.3598318, abs=1e-6) + assert p.lon == pytest.approx(8.5227055, abs=1e-6) + assert p.alt == pytest.approx(414.87, abs=0.1) + assert p.fix == telemetry.GPSFix.FIX_3D + assert p.precision == 219 + assert p.ground_speed == pytest.approx(0.891, abs=0.01) + + def test_epoch_time_backfilled(self): + """All GPS points should have epoch_time after backfilling.""" + with open(GPS5_VIDEO, "rb") as f: + info = gpmf_parser.extract_gopro_info(f) + for p in info.gps: + assert p.epoch_time is not None + + def test_telemetry_not_extracted_by_default(self): + """By default (telemetry_only=False), ACCL/GYRO/MAGN are None.""" + with open(GPS5_VIDEO, "rb") as f: + info = gpmf_parser.extract_gopro_info(f) + assert info.accl is None + assert info.gyro is None + assert info.magn is None + + +# --------------------------------------------------------------------------- +# 15. extract_gopro_info with GPS9 file (GoPro MAX 2) +# --------------------------------------------------------------------------- +@pytest.mark.skipif(not _has_gps9_video, reason="GPS9 test data not available") +class TestExtractGoProInfoGPS9: + def test_basic_extraction(self): + with open(GPS9_VIDEO, "rb") as f: + info = gpmf_parser.extract_gopro_info(f) + assert info is not None + assert info.make == "GoPro" + assert info.model == "MAX2" + + def test_gps_count(self): + with open(GPS9_VIDEO, "rb") as f: + info = gpmf_parser.extract_gopro_info(f) + assert info.gps is not None + assert len(info.gps) == 267 + + def test_first_gps_point(self): + with open(GPS9_VIDEO, "rb") as f: + info = gpmf_parser.extract_gopro_info(f) + p = info.gps[0] + assert p.lat == pytest.approx(51.0776007, abs=1e-5) + assert p.lon == pytest.approx(6.2268453, abs=1e-5) + assert p.alt == pytest.approx(85.454, abs=0.1) + assert p.fix == telemetry.GPSFix.FIX_3D + assert p.precision == pytest.approx(185.0, abs=1.0) + + def test_epoch_time_backfilled(self): + with open(GPS9_VIDEO, "rb") as f: + info = gpmf_parser.extract_gopro_info(f) + for p in info.gps: + assert p.epoch_time is not None + + +# --------------------------------------------------------------------------- +# 16. extract_gopro_info with telemetry_only mode +# --------------------------------------------------------------------------- +@pytest.mark.skipif(not _has_hero7_video, reason="Hero7 test data not available") +class TestExtractGoProInfoTelemetryOnly: + def test_telemetry_only_no_gps(self): + """In telemetry_only mode, GPS is None.""" + with open(HERO7_VIDEO, "rb") as f: + info = gpmf_parser.extract_gopro_info(f, telemetry_only=True) + assert info is not None + assert info.gps is None + + def test_telemetry_only_no_model(self): + """In telemetry_only mode, model is not extracted.""" + with open(HERO7_VIDEO, "rb") as f: + info = gpmf_parser.extract_gopro_info(f, telemetry_only=True) + assert info.model == "" + + def test_telemetry_only_has_accl(self): + with open(HERO7_VIDEO, "rb") as f: + info = gpmf_parser.extract_gopro_info(f, telemetry_only=True) + assert info.accl is not None + assert len(info.accl) == 103806 + + def test_telemetry_only_has_gyro(self): + with open(HERO7_VIDEO, "rb") as f: + info = gpmf_parser.extract_gopro_info(f, telemetry_only=True) + assert info.gyro is not None + assert len(info.gyro) == 103806 + + def test_telemetry_only_first_accl_values(self): + with open(HERO7_VIDEO, "rb") as f: + info = gpmf_parser.extract_gopro_info(f, telemetry_only=True) + a = info.accl[0] + assert a.x == pytest.approx(1.672, abs=0.01) + assert a.y == pytest.approx(5.175, abs=0.01) + assert a.z == pytest.approx(11.022, abs=0.01) + + def test_telemetry_only_first_gyro_values(self): + with open(HERO7_VIDEO, "rb") as f: + info = gpmf_parser.extract_gopro_info(f, telemetry_only=True) + g = info.gyro[0] + assert g.x == pytest.approx(0.121, abs=0.01) + assert g.y == pytest.approx(0.896, abs=0.01) + assert g.z == pytest.approx(0.165, abs=0.01) + + +# --------------------------------------------------------------------------- +# 17. extract_gopro_info normal mode with Hero 7 +# --------------------------------------------------------------------------- +@pytest.mark.skipif(not _has_hero7_video, reason="Hero7 test data not available") +class TestExtractGoProInfoHero7: + def test_model_is_hero7(self): + with open(HERO7_VIDEO, "rb") as f: + info = gpmf_parser.extract_gopro_info(f) + assert info is not None + assert info.model == "Hero7 Black" + + def test_gps_count(self): + with open(HERO7_VIDEO, "rb") as f: + info = gpmf_parser.extract_gopro_info(f) + assert info.gps is not None + assert len(info.gps) == 2435 + + def test_first_gps_point(self): + with open(HERO7_VIDEO, "rb") as f: + info = gpmf_parser.extract_gopro_info(f) + p = info.gps[0] + assert p.lat == pytest.approx(34.3985866, abs=1e-5) + assert p.lon == pytest.approx(-119.6986334, abs=1e-5) + assert p.alt == pytest.approx(-34.141, abs=0.1) + assert p.fix == telemetry.GPSFix.FIX_3D + assert p.ground_speed == pytest.approx(0.313, abs=0.01) + + +# --------------------------------------------------------------------------- +# 18. _flatten helper +# --------------------------------------------------------------------------- +class TestFlatten: + def test_basic(self): + assert gpmf_parser._flatten([[1, 2], [3, 4]]) == [1, 2, 3, 4] + + def test_single_row(self): + assert gpmf_parser._flatten([[5, 6, 7]]) == [5, 6, 7] + + def test_empty(self): + assert gpmf_parser._flatten([]) == [] diff --git a/tests/unit/test_sample_video.py b/tests/unit/test_sample_video.py index f9c076cb..e92aef4a 100644 --- a/tests/unit/test_sample_video.py +++ b/tests/unit/test_sample_video.py @@ -11,16 +11,31 @@ import shutil import typing as T from pathlib import Path +from unittest import mock import py.path import pytest -from mapillary_tools import exif_read, ffmpeg, sample_video + +from mapillary_tools import ( + exceptions, + exif_read, + ffmpeg as ffmpeglib, + geo, + sample_video, +) +from mapillary_tools.mp4 import mp4_sample_parser from mapillary_tools.serializer import description +from mapillary_tools.types import FileType, VideoMetadata _PWD = Path(os.path.dirname(os.path.abspath(__file__))) -class MOCK_FFMPEG(ffmpeg.FFMPEG): +# --------------------------------------------------------------------------- +# Interval-based sampling tests (using MOCK_FFMPEG) +# --------------------------------------------------------------------------- + + +class MOCK_FFMPEG(ffmpeglib.FFMPEG): def extract_frames_by_interval( self, video_path: Path, @@ -40,14 +55,14 @@ def extract_frames_by_interval( sample = f"{frame_path_prefix}_{stream_specifier}_{idx + 1:06d}.jpg" shutil.copyfile(src, sample) - def probe_format_and_streams(self, video_path: Path) -> ffmpeg.ProbeOutput: + def probe_format_and_streams(self, video_path: Path) -> ffmpeglib.ProbeOutput: with open(video_path) as fp: return json.load(fp) @pytest.fixture def setup_mock(monkeypatch): - monkeypatch.setattr(ffmpeg, "FFMPEG", MOCK_FFMPEG) + monkeypatch.setattr(ffmpeglib, "FFMPEG", MOCK_FFMPEG) def _validate_interval(samples: T.Sequence[Path], video_start_time): @@ -107,3 +122,490 @@ def test_sample_video_with_start_time(tmpdir: py.path.local, setup_mock): ) samples = sample_dir.join("hello.mp4").listdir() _validate_interval([Path(s) for s in samples], video_start_time) + + +# --------------------------------------------------------------------------- +# Helpers for distance-based sampling tests +# --------------------------------------------------------------------------- + +MOCK_PROBE_JSON = _PWD / "data" / "mock_sample_video" / "videos" / "hello.mp4" +TEST_EXIF_JPG = _PWD / "data" / "test_exif.jpg" + +# Start time derived from the hello.mp4 probe fixture: +# creation_time "2021-08-10T14:38:06.000000Z" - duration "60.977000" +PROBE_START_TIME = datetime.datetime( + 2021, 8, 10, 14, 36, 55, 23000, tzinfo=datetime.timezone.utc +) + + +def _load_probe_output() -> ffmpeglib.ProbeOutput: + with open(MOCK_PROBE_JSON) as fp: + return T.cast(ffmpeglib.ProbeOutput, json.load(fp)) + + +def _make_gps_points( + n: int = 10, + start_lat: float = 40.0, + start_lon: float = -74.0, + lat_step: float = 0.001, + lon_step: float = 0.001, + time_step: float = 1.0, +) -> list[geo.Point]: + """Create a synthetic GPS track with n points.""" + return [ + geo.Point( + time=i * time_step, + lat=start_lat + i * lat_step, + lon=start_lon + i * lon_step, + alt=10.0, + angle=45.0, + ) + for i in range(n) + ] + + +def _make_sample( + composition_time: float, + timedelta: float = 0.033, +) -> mp4_sample_parser.Sample: + """Create a synthetic mp4 Sample at the given composition time.""" + raw = mp4_sample_parser.RawSample( + description_idx=1, + offset=0, + size=1000, + timedelta=int(timedelta * 1000), + composition_offset=0, + is_sync=True, + ) + return mp4_sample_parser.Sample( + raw_sample=raw, + exact_time=composition_time, + exact_composition_time=composition_time, + exact_timedelta=timedelta, + description={}, + ) + + +def _create_fake_frames( + sample_dir: Path, + video_stem: str, + stream_specifier: str, + num_frames: int, +) -> list[Path]: + """Create fake JPEG frame files in sample_dir mimicking ffmpeg output.""" + os.makedirs(sample_dir, exist_ok=True) + paths: list[Path] = [] + for i in range(1, num_frames + 1): + name = f"{video_stem}_{stream_specifier}_{i:06d}.jpg" + frame_path = sample_dir / name + shutil.copy(str(TEST_EXIF_JPG), str(frame_path)) + paths.append(frame_path) + return paths + + +# --------------------------------------------------------------------------- +# Distance-based sampling: _within_track_time_range_buffered +# --------------------------------------------------------------------------- + + +class TestWithinTrackTimeRangeBuffered: + """Tests for _within_track_time_range_buffered.""" + + def test_within_range(self) -> None: + points = _make_gps_points(5, time_step=1.0) + assert sample_video._within_track_time_range_buffered(points, 2.0) is True + + def test_at_start_boundary(self) -> None: + points = _make_gps_points(5, time_step=1.0) + assert sample_video._within_track_time_range_buffered(points, 0.0) is True + + def test_at_end_boundary(self) -> None: + points = _make_gps_points(5, time_step=1.0) + assert sample_video._within_track_time_range_buffered(points, 4.0) is True + + def test_within_1ms_buffer_before_start(self) -> None: + points = _make_gps_points(5, time_step=1.0) + assert sample_video._within_track_time_range_buffered(points, -0.0005) is True + + def test_within_1ms_buffer_after_end(self) -> None: + points = _make_gps_points(5, time_step=1.0) + assert sample_video._within_track_time_range_buffered(points, 4.0005) is True + + def test_outside_buffer_before_start(self) -> None: + points = _make_gps_points(5, time_step=1.0) + assert sample_video._within_track_time_range_buffered(points, -0.002) is False + + def test_outside_buffer_after_end(self) -> None: + points = _make_gps_points(5, time_step=1.0) + assert sample_video._within_track_time_range_buffered(points, 4.002) is False + + def test_exactly_at_1ms_boundary(self) -> None: + points = _make_gps_points(5, time_step=1.0) + assert sample_video._within_track_time_range_buffered(points, -0.001) is True + assert sample_video._within_track_time_range_buffered(points, 4.001) is True + + +# --------------------------------------------------------------------------- +# Distance-based sampling: _sample_video_stream_by_distance +# --------------------------------------------------------------------------- + + +class TestSampleVideoStreamByDistance: + """Tests for _sample_video_stream_by_distance.""" + + def test_selects_frames_by_distance(self) -> None: + """Frames spaced farther than sample_distance should be selected.""" + points = _make_gps_points(10, lat_step=0.001, time_step=1.0) + samples = [_make_sample(float(i)) for i in range(10)] + + mock_parser = mock.MagicMock(spec=mp4_sample_parser.TrackBoxParser) + mock_parser.extract_samples.return_value = iter(samples) + + result = sample_video._sample_video_stream_by_distance( + points, mock_parser, sample_distance=50.0 + ) + + # Each point is ~111m apart in lat, so all 10 should be selected + assert len(result) == 10 + assert all(idx in result for idx in range(10)) + + def test_filters_close_frames(self) -> None: + """Frames closer than sample_distance should be filtered out.""" + # ~15m apart (0.0001 degree in each axis) + points = _make_gps_points(10, lat_step=0.0001, lon_step=0.0001, time_step=1.0) + samples = [_make_sample(float(i)) for i in range(10)] + + mock_parser = mock.MagicMock(spec=mp4_sample_parser.TrackBoxParser) + mock_parser.extract_samples.return_value = iter(samples) + + result = sample_video._sample_video_stream_by_distance( + points, mock_parser, sample_distance=50.0 + ) + + assert len(result) < 10 + assert 0 in result # first frame is always selected + + def test_zero_distance_selects_all(self) -> None: + """With sample_distance=0, all frames in range should be selected.""" + points = _make_gps_points(5, time_step=1.0) + samples = [_make_sample(float(i)) for i in range(5)] + + mock_parser = mock.MagicMock(spec=mp4_sample_parser.TrackBoxParser) + mock_parser.extract_samples.return_value = iter(samples) + + result = sample_video._sample_video_stream_by_distance( + points, mock_parser, sample_distance=0.0 + ) + + assert len(result) == 5 + + def test_frames_outside_track_range_excluded(self) -> None: + """Frames outside the GPS track time range should not be selected.""" + # GPS track covers t=2..6 + points = [ + geo.Point(time=p.time + 2.0, lat=p.lat, lon=p.lon, alt=p.alt, angle=p.angle) + for p in _make_gps_points(5, time_step=1.0) + ] + + # Samples at t=0..9 — only t=2..6 should be interpolated + samples = [_make_sample(float(i)) for i in range(10)] + + mock_parser = mock.MagicMock(spec=mp4_sample_parser.TrackBoxParser) + mock_parser.extract_samples.return_value = iter(samples) + + result = sample_video._sample_video_stream_by_distance( + points, mock_parser, sample_distance=0.0 + ) + + for idx in result: + sample_time = samples[idx].exact_composition_time + assert 1.999 <= sample_time <= 6.001 + + def test_empty_samples(self) -> None: + """Empty video track should produce no selected frames.""" + points = _make_gps_points(5, time_step=1.0) + + mock_parser = mock.MagicMock(spec=mp4_sample_parser.TrackBoxParser) + mock_parser.extract_samples.return_value = iter([]) + + result = sample_video._sample_video_stream_by_distance( + points, mock_parser, sample_distance=3.0 + ) + + assert len(result) == 0 + + +# --------------------------------------------------------------------------- +# sample_video() parameter validation & rerun +# --------------------------------------------------------------------------- + + +class TestSampleVideoNegativeDistance: + """Test sample_video() with invalid parameters.""" + + def test_negative_distance_raises(self, tmp_path: Path) -> None: + video_dir = tmp_path / "videos" + video_dir.mkdir() + (video_dir / "test.mp4").touch() + + with pytest.raises(exceptions.MapillaryBadParameterError): + sample_video.sample_video( + video_import_path=video_dir, + import_path=tmp_path / "output", + video_sample_distance=1.0, + video_sample_interval=1.0, + ) + + +class TestSampleVideoRerun: + """Test rerun behavior of sample_video.""" + + def test_skip_existing_samples_without_rerun(self, tmp_path: Path) -> None: + """Existing sample directories should be skipped without --rerun.""" + video_dir = tmp_path / "videos" + video_dir.mkdir() + (video_dir / "test.mp4").touch() + + output_dir = tmp_path / "output" + sample_dir = output_dir / "test.mp4" + sample_dir.mkdir(parents=True) + (sample_dir / "frame_000001.jpg").touch() + + with mock.patch.object( + sample_video, "_sample_single_video_by_distance" + ) as mock_sample: + sample_video.sample_video( + video_import_path=video_dir, + import_path=output_dir, + rerun=False, + ) + mock_sample.assert_not_called() + + def test_rerun_removes_existing_and_resamples(self, tmp_path: Path) -> None: + """With --rerun, existing sample directories should be removed.""" + video_dir = tmp_path / "videos" + video_dir.mkdir() + (video_dir / "test.mp4").touch() + + output_dir = tmp_path / "output" + sample_dir = output_dir / "test.mp4" + sample_dir.mkdir(parents=True) + marker = sample_dir / "old_frame.jpg" + marker.touch() + + with mock.patch.object( + sample_video, "_sample_single_video_by_distance" + ) as mock_sample: + sample_video.sample_video( + video_import_path=video_dir, + import_path=output_dir, + rerun=True, + ) + assert not marker.exists() + mock_sample.assert_called_once() + + +# --------------------------------------------------------------------------- +# Distance-based sampling: integration tests with mocked ffmpeg and geotag +# --------------------------------------------------------------------------- + + +class TestSampleVideoDistanceIntegration: + """Integration-style tests for the distance-based sampling path.""" + + def _setup_mocks( + self, + tmp_path: Path, + video_path: Path, + num_gps_points: int = 10, + ) -> dict[str, T.Any]: + """Set up all the mocks needed for _sample_single_video_by_distance.""" + probe_output = _load_probe_output() + gps_points = _make_gps_points(num_gps_points, time_step=1.0) + + video_metadata = VideoMetadata( + filename=video_path, + filetype=FileType.CAMM, + points=gps_points, + make="TestMake", + model="TestModel", + ) + + video_samples = [_make_sample(float(i)) for i in range(num_gps_points)] + + mock_track_parser = mock.MagicMock(spec=mp4_sample_parser.TrackBoxParser) + mock_track_parser.extract_samples.return_value = iter(video_samples) + + mock_moov_parser = mock.MagicMock(spec=mp4_sample_parser.MovieBoxParser) + mock_moov_parser.extract_track_at.return_value = mock_track_parser + + patches = {} + + # Mock FFMPEG: instance methods are mocked, classmethods delegate to real + def fake_extract_frames( + video_path: Path, + sample_dir: Path, + frame_indices: set[int], + stream_specifier: str = "v", + ) -> None: + _create_fake_frames( + sample_dir, + video_path.stem, + stream_specifier, + len(frame_indices), + ) + + mock_ffmpeg_instance = mock.MagicMock(spec=ffmpeglib.FFMPEG) + mock_ffmpeg_instance.probe_format_and_streams.return_value = probe_output + mock_ffmpeg_instance.extract_specified_frames.side_effect = fake_extract_frames + + mock_ffmpeg_class = mock.MagicMock() + mock_ffmpeg_class.return_value = mock_ffmpeg_instance + mock_ffmpeg_class.sort_selected_samples = ffmpeglib.FFMPEG.sort_selected_samples + mock_ffmpeg_class.iterate_samples = ffmpeglib.FFMPEG.iterate_samples + mock_ffmpeg_class._extract_stream_frame_idx = ( + ffmpeglib.FFMPEG._extract_stream_frame_idx + ) + mock_ffmpeg_class._validate_stream_specifier = ( + ffmpeglib.FFMPEG._validate_stream_specifier + ) + mock_ffmpeg_class.FRAME_EXT = ffmpeglib.FFMPEG.FRAME_EXT + + patches["ffmpeg_cls"] = mock.patch( + "mapillary_tools.sample_video.ffmpeglib.FFMPEG", + mock_ffmpeg_class, + ) + + mock_geotag_instance = mock.MagicMock() + mock_geotag_instance.to_description.return_value = [video_metadata] + patches["geotag_cls"] = mock.patch( + "mapillary_tools.sample_video.geotag_videos_from_video.GeotagVideosFromVideo", + return_value=mock_geotag_instance, + ) + + patches["moov_parse"] = mock.patch.object( + mp4_sample_parser.MovieBoxParser, + "parse_file", + return_value=mock_moov_parser, + ) + + return { + "patches": patches, + "gps_points": gps_points, + "video_metadata": video_metadata, + } + + def test_single_video_file(self, tmp_path: Path) -> None: + """sample_video with a single video file produces sample frames.""" + video_dir = tmp_path / "videos" + video_dir.mkdir() + video_file = video_dir / "test.mp4" + video_file.touch() + output_dir = tmp_path / "output" + + mocks = self._setup_mocks(tmp_path, video_file) + + with ( + mocks["patches"]["ffmpeg_cls"], + mocks["patches"]["geotag_cls"], + mocks["patches"]["moov_parse"], + ): + sample_video.sample_video( + video_import_path=video_file, + import_path=output_dir, + video_sample_distance=0.0, + ) + + sample_dir = output_dir / "test.mp4" + assert sample_dir.is_dir() + + frames = list(sample_dir.glob("*.jpg")) + assert len(frames) > 0 + + exif = exif_read.ExifRead(frames[0]) + assert exif.extract_lon_lat() is not None + assert exif.extract_capture_time() is not None + + def test_video_directory(self, tmp_path: Path) -> None: + """sample_video with a directory processes all videos.""" + video_dir = tmp_path / "videos" + video_dir.mkdir() + (video_dir / "clip1.mp4").touch() + (video_dir / "clip2.mp4").touch() + output_dir = tmp_path / "output" + + with mock.patch.object( + sample_video, "_sample_single_video_by_distance" + ) as mock_sample: + sample_video.sample_video( + video_import_path=video_dir, + import_path=output_dir, + video_sample_distance=3.0, + ) + assert mock_sample.call_count == 2 + + def test_custom_start_time(self, tmp_path: Path) -> None: + """sample_video with video_start_time override uses the given time.""" + video_dir = tmp_path / "videos" + video_dir.mkdir() + video_file = video_dir / "test.mp4" + video_file.touch() + output_dir = tmp_path / "output" + + mocks = self._setup_mocks(tmp_path, video_file) + + with ( + mocks["patches"]["ffmpeg_cls"], + mocks["patches"]["geotag_cls"], + mocks["patches"]["moov_parse"], + ): + sample_video.sample_video( + video_import_path=video_file, + import_path=output_dir, + video_sample_distance=0.0, + video_start_time="2023_06_15_12_00_00_000", + ) + + sample_dir = output_dir / "test.mp4" + frames = list(sample_dir.glob("*.jpg")) + assert len(frames) > 0 + + exif = exif_read.ExifRead(frames[0]) + capture_time = exif.extract_capture_time() + assert capture_time is not None + assert capture_time.year == 2023 + assert capture_time.month == 6 + assert capture_time.day == 15 + + def test_exif_lat_lon_written(self, tmp_path: Path) -> None: + """Verify GPS coordinates are written into EXIF of sampled frames.""" + video_dir = tmp_path / "videos" + video_dir.mkdir() + video_file = video_dir / "test.mp4" + video_file.touch() + output_dir = tmp_path / "output" + + mocks = self._setup_mocks(tmp_path, video_file) + + with ( + mocks["patches"]["ffmpeg_cls"], + mocks["patches"]["geotag_cls"], + mocks["patches"]["moov_parse"], + ): + sample_video.sample_video( + video_import_path=video_file, + import_path=output_dir, + video_sample_distance=0.0, + ) + + sample_dir = output_dir / "test.mp4" + frames = sorted(sample_dir.glob("*.jpg")) + assert len(frames) > 0 + + exif = exif_read.ExifRead(frames[0]) + lon_lat = exif.extract_lon_lat() + assert lon_lat is not None + lon, lat = lon_lat + # First GPS point is at (40.0, -74.0) + assert abs(lat - 40.0) < 0.01 + assert abs(lon - (-74.0)) < 0.01 From 709a157bfc371dc12ae5047b439670dca1096745 Mon Sep 17 00:00:00 2001 From: Caglar Pir Date: Tue, 3 Mar 2026 07:24:42 -0800 Subject: [PATCH 2/2] Reduce cyclomatic complexity in GPS parser functions --- mapillary_tools/blackvue_parser.py | 134 +++++++++++----------- mapillary_tools/exiftool_read_video.py | 150 +++++++++++++++---------- mapillary_tools/gpmf/gpmf_parser.py | 94 +++++++++------- 3 files changed, 219 insertions(+), 159 deletions(-) diff --git a/mapillary_tools/blackvue_parser.py b/mapillary_tools/blackvue_parser.py index ffb1bc4b..625c95dc 100644 --- a/mapillary_tools/blackvue_parser.py +++ b/mapillary_tools/blackvue_parser.py @@ -193,6 +193,50 @@ def _parse_nmea_lines( yield epoch_ms, message +def _detect_timezone_offset( + parsed_lines: list[tuple[float, pynmea2.NMEASentence]], +) -> float: + """ + Detect timezone offset between camera clock and GPS time. + + Tries RMC messages first (most reliable - has full date+time), + then falls back to GGA/GLL (less reliable - time only, no date). + Returns 0.0 if no offset could be determined. + """ + first_valid_gga_gll: tuple[float, pynmea2.NMEASentence] | None = None + + for epoch_sec, message in parsed_lines: + if message.sentence_type == "RMC": + if hasattr(message, "is_valid") and message.is_valid: + offset = _compute_timezone_offset_from_rmc(epoch_sec, message) + if offset is not None: + LOG.debug( + "Computed timezone offset %.1fs from RMC (%s %s)", + offset, + message.datestamp, + message.timestamp, + ) + return offset + + if first_valid_gga_gll is None and message.sentence_type in ["GGA", "GLL"]: + if hasattr(message, "is_valid") and message.is_valid: + first_valid_gga_gll = (epoch_sec, message) + + # Fallback: if no RMC found, try GGA/GLL (less reliable - no date info) + if first_valid_gga_gll is not None: + epoch_sec, message = first_valid_gga_gll + offset = _compute_timezone_offset_from_time_only(epoch_sec, message) + if offset is not None: + LOG.debug( + "Computed timezone offset %.1fs from %s (fallback, no date info)", + offset, + message.sentence_type, + ) + return offset + + return 0.0 + + def _parse_gps_box(gps_data: bytes) -> list[telemetry.GPSPoint]: """ >>> list(_parse_gps_box(b"[1623057074211]$GPGGA,202530.00,5109.0262,N,11401.8407,W,5,40,0.5,1097.36,M,-17.00,M,18,TSTR*61")) @@ -210,83 +254,47 @@ def _parse_gps_box(gps_data: bytes) -> list[telemetry.GPSPoint]: >>> list(_parse_gps_box(b"[1623057074211]$GPVTG,,T,,M,0.078,N,0.144,K,D*28[1623057075215]")) [] """ - timezone_offset: float | None = None parsed_lines: list[tuple[float, pynmea2.NMEASentence]] = [] - first_valid_gga_gll: tuple[float, pynmea2.NMEASentence] | None = None - # First pass: collect parsed_lines and compute timezone offset from the first valid RMC message + # First pass: collect parsed_lines for epoch_ms, message in _parse_nmea_lines(gps_data): # Rounding needed to avoid floating point precision issues epoch_sec = round(epoch_ms / 1000, 3) parsed_lines.append((epoch_sec, message)) - if timezone_offset is None and message.sentence_type == "RMC": - if hasattr(message, "is_valid") and message.is_valid: - timezone_offset = _compute_timezone_offset_from_rmc(epoch_sec, message) - if timezone_offset is not None: - LOG.debug( - "Computed timezone offset %.1fs from RMC (%s %s)", - timezone_offset, - message.datestamp, - message.timestamp, - ) - # Track first valid GGA/GLL for fallback - if first_valid_gga_gll is None and message.sentence_type in ["GGA", "GLL"]: - if hasattr(message, "is_valid") and message.is_valid: - first_valid_gga_gll = (epoch_sec, message) - - # Fallback: if no RMC found, try GGA/GLL (less reliable - no date info) - if timezone_offset is None and first_valid_gga_gll is not None: - epoch_sec, message = first_valid_gga_gll - timezone_offset = _compute_timezone_offset_from_time_only(epoch_sec, message) - if timezone_offset is not None: - LOG.debug( - "Computed timezone offset %.1fs from %s (fallback, no date info)", - timezone_offset, - message.sentence_type, - ) - # If no offset could be determined, use 0 (camera clock assumed correct) - if timezone_offset is None: - timezone_offset = 0.0 + timezone_offset = _detect_timezone_offset(parsed_lines) points_by_sentence_type: dict[str, list[telemetry.GPSPoint]] = {} # Second pass: apply offset to all GPS points for epoch_sec, message in parsed_lines: + if message.sentence_type not in ("GGA", "RMC", "GLL"): + continue + if not message.is_valid: + continue + corrected_epoch = round(epoch_sec + timezone_offset, 3) - # https://tavotech.com/gps-nmea-sentence-structure/ - if message.sentence_type in ["GGA"]: - if not message.is_valid: - continue - point = telemetry.GPSPoint( - time=corrected_epoch, - lat=message.latitude, - lon=message.longitude, - alt=message.altitude, - angle=None, - epoch_time=corrected_epoch, - fix=telemetry.GPSFix.FIX_3D if message.gps_qual >= 1 else None, - precision=None, - ground_speed=None, - ) - points_by_sentence_type.setdefault(message.sentence_type, []).append(point) - - elif message.sentence_type in ["RMC", "GLL"]: - if not message.is_valid: - continue - point = telemetry.GPSPoint( - time=corrected_epoch, - lat=message.latitude, - lon=message.longitude, - alt=None, - angle=None, - epoch_time=corrected_epoch, - fix=None, - precision=None, - ground_speed=None, - ) - points_by_sentence_type.setdefault(message.sentence_type, []).append(point) + # GGA has altitude and fix; RMC and GLL do not + if message.sentence_type == "GGA": + alt = message.altitude + fix = telemetry.GPSFix.FIX_3D if message.gps_qual >= 1 else None + else: + alt = None + fix = None + + point = telemetry.GPSPoint( + time=corrected_epoch, + lat=message.latitude, + lon=message.longitude, + alt=alt, + angle=None, + epoch_time=corrected_epoch, + fix=fix, + precision=None, + ground_speed=None, + ) + points_by_sentence_type.setdefault(message.sentence_type, []).append(point) # This is the extraction order in exiftool if "RMC" in points_by_sentence_type: diff --git a/mapillary_tools/exiftool_read_video.py b/mapillary_tools/exiftool_read_video.py index a5c2fcc0..66d0d0ad 100644 --- a/mapillary_tools/exiftool_read_video.py +++ b/mapillary_tools/exiftool_read_video.py @@ -120,6 +120,86 @@ def _deduplicate_gps_points( return deduplicated_track +def _aggregate_float_values_same_length( + texts_by_tag: dict[str, list[str]], + tag: str | None, + expected_length: int, +) -> list[float | None]: + if tag is not None: + vals = [ + _maybe_float(val) + for val in _extract_alternative_fields(texts_by_tag, [tag], list) or [] + ] + else: + vals = [] + while len(vals) < expected_length: + vals.append(None) + return vals + + +def _aggregate_epoch_times( + texts_by_tag: dict[str, list[str]], + gps_time_tag: str | None, + time_tag: str | None, + timestamps: list[float | None], + expected_length: int, +) -> list[float | None]: + """Aggregate GPS epoch times from tags, with fallback to per-point timestamps.""" + if gps_time_tag is not None: + gps_epoch_times: list[float | None] = [ + geo.as_unix_time(dt) if dt is not None else None + for dt in ( + exif_read.parse_gps_datetime(text) + for text in _extract_alternative_fields( + texts_by_tag, [gps_time_tag], list + ) + or [] + ) + ] + if len(gps_epoch_times) != expected_length: + LOG.warning( + "Found different number of GPS epoch times %d and coordinates %d", + len(gps_epoch_times), + expected_length, + ) + gps_epoch_times = [None] * expected_length + return gps_epoch_times + elif time_tag is not None: + # Use per-point GPS timestamps as epoch times + return [t for t in timestamps] + else: + return [None] * expected_length + + +def _aggregate_timestamps( + texts_by_tag: dict[str, list[str]], + time_tag: str | None, + expected_length: int, +) -> list[float | None] | None: + """Aggregate timestamps from the time tag. + + Returns the timestamp list, or None if the lengths don't match + (caller should return [] in that case). + """ + if time_tag is not None: + dts = [ + exif_read.parse_gps_datetime(text) + for text in _extract_alternative_fields(texts_by_tag, [time_tag], list) + or [] + ] + timestamps = [geo.as_unix_time(dt) if dt is not None else None for dt in dts] + if expected_length != len(timestamps): + LOG.warning( + "Found different number of timestamps %d and coordinates %d", + len(timestamps), + expected_length, + ) + return None + else: + timestamps = [0.0] * expected_length + return timestamps + + def _aggregate_gps_track( texts_by_tag: dict[str, list[str]], time_tag: str | None, @@ -159,73 +239,29 @@ def _aggregate_gps_track( expected_length = len(lats) # aggregate timestamps (optional) - if time_tag is not None: - dts = [ - exif_read.parse_gps_datetime(text) - for text in _extract_alternative_fields(texts_by_tag, [time_tag], list) - or [] - ] - timestamps = [geo.as_unix_time(dt) if dt is not None else None for dt in dts] - if expected_length != len(timestamps): - # no idea what to do if we have different number of timestamps and coordinates - LOG.warning( - "Found different number of timestamps %d and coordinates %d", - len(timestamps), - expected_length, - ) - return [] - else: - timestamps = [0.0] * expected_length + timestamps = _aggregate_timestamps(texts_by_tag, time_tag, expected_length) + if timestamps is None: + return [] assert len(timestamps) == expected_length - def _aggregate_float_values_same_length( - tag: str | None, - ) -> list[float | None]: - if tag is not None: - vals = [ - _maybe_float(val) - for val in _extract_alternative_fields(texts_by_tag, [tag], list) or [] - ] - else: - vals = [] - while len(vals) < expected_length: - vals.append(None) - return vals - # aggregate altitudes (optional) - alts = _aggregate_float_values_same_length(alt_tag) + alts = _aggregate_float_values_same_length(texts_by_tag, alt_tag, expected_length) # aggregate directions (optional) - directions = _aggregate_float_values_same_length(direction_tag) + directions = _aggregate_float_values_same_length( + texts_by_tag, direction_tag, expected_length + ) # aggregate speeds (optional) - ground_speeds = _aggregate_float_values_same_length(ground_speed_tag) + ground_speeds = _aggregate_float_values_same_length( + texts_by_tag, ground_speed_tag, expected_length + ) # GPS epoch times (optional) - if gps_time_tag is not None: - gps_epoch_times: list[float | None] = [ - geo.as_unix_time(dt) if dt is not None else None - for dt in ( - exif_read.parse_gps_datetime(text) - for text in _extract_alternative_fields( - texts_by_tag, [gps_time_tag], list - ) - or [] - ) - ] - if len(gps_epoch_times) != expected_length: - LOG.warning( - "Found different number of GPS epoch times %d and coordinates %d", - len(gps_epoch_times), - expected_length, - ) - gps_epoch_times = [None] * expected_length - elif time_tag is not None: - # Use per-point GPS timestamps as epoch times - gps_epoch_times = [t for t in timestamps] - else: - gps_epoch_times = [None] * expected_length + gps_epoch_times = _aggregate_epoch_times( + texts_by_tag, gps_time_tag, time_tag, timestamps, expected_length + ) # build track track: list[GPSPoint] = [] diff --git a/mapillary_tools/gpmf/gpmf_parser.py b/mapillary_tools/gpmf/gpmf_parser.py index 1ba3c8db..e82329b7 100644 --- a/mapillary_tools/gpmf/gpmf_parser.py +++ b/mapillary_tools/gpmf/gpmf_parser.py @@ -561,6 +561,37 @@ def _find_first_telemetry_stream(stream: T.Sequence[KLVDict], key: bytes): return values +_XYZDataT = T.TypeVar( + "_XYZDataT", + telemetry.AccelerationData, + telemetry.GyroscopeData, + telemetry.MagnetometerData, +) + + +def _accumulate_xyz_telemetry( + device_data: T.Sequence[KLVDict], + sample: Sample, + stream_key: bytes, + data_class: T.Type[_XYZDataT], + output_dict: dict[int, list[_XYZDataT]], + device_id: int, +) -> None: + """Extract XYZ telemetry (ACCL/GYRO/MAGN) from a device and accumulate into output_dict.""" + samples = _find_first_telemetry_stream(device_data, stream_key) + if samples: + avg_delta = sample.exact_timedelta / len(samples) + output_dict.setdefault(device_id, []).extend( + data_class( + time=sample.exact_time + avg_delta * idx, + x=x, + y=y, + z=z, + ) + for idx, (z, x, y, *_) in enumerate(samples) + ) + + def _backfill_gps_timestamps(gps_points: T.Iterable[telemetry.GPSPoint]) -> None: it = iter(gps_points) @@ -626,49 +657,34 @@ def _load_telemetry_from_samples( device_points.extend(sample_points) if accls_by_dvid is not None: - sample_accls = _find_first_telemetry_stream(device["data"], b"ACCL") - if sample_accls: - # interpolate timestamps in between - avg_delta = sample.exact_timedelta / len(sample_accls) - accls_by_dvid.setdefault(device_id, []).extend( - telemetry.AccelerationData( - time=sample.exact_time + avg_delta * idx, - x=x, - y=y, - z=z, - ) - for idx, (z, x, y, *_) in enumerate(sample_accls) - ) + _accumulate_xyz_telemetry( + device["data"], + sample, + b"ACCL", + telemetry.AccelerationData, + accls_by_dvid, + device_id, + ) if gyros_by_dvid is not None: - sample_gyros = _find_first_telemetry_stream(device["data"], b"GYRO") - if sample_gyros: - # interpolate timestamps in between - avg_delta = sample.exact_timedelta / len(sample_gyros) - gyros_by_dvid.setdefault(device_id, []).extend( - telemetry.GyroscopeData( - time=sample.exact_time + avg_delta * idx, - x=x, - y=y, - z=z, - ) - for idx, (z, x, y, *_) in enumerate(sample_gyros) - ) + _accumulate_xyz_telemetry( + device["data"], + sample, + b"GYRO", + telemetry.GyroscopeData, + gyros_by_dvid, + device_id, + ) if magns_by_dvid is not None: - sample_magns = _find_first_telemetry_stream(device["data"], b"MAGN") - if sample_magns: - # interpolate timestamps in between - avg_delta = sample.exact_timedelta / len(sample_magns) - magns_by_dvid.setdefault(device_id, []).extend( - telemetry.MagnetometerData( - time=sample.exact_time + avg_delta * idx, - x=x, - y=y, - z=z, - ) - for idx, (z, x, y, *_) in enumerate(sample_magns) - ) + _accumulate_xyz_telemetry( + device["data"], + sample, + b"MAGN", + telemetry.MagnetometerData, + magns_by_dvid, + device_id, + ) return device_found