diff --git a/mapillary_tools/blackvue_parser.py b/mapillary_tools/blackvue_parser.py
index ffb1bc4b..625c95dc 100644
--- a/mapillary_tools/blackvue_parser.py
+++ b/mapillary_tools/blackvue_parser.py
@@ -193,6 +193,50 @@ def _parse_nmea_lines(
yield epoch_ms, message
+def _detect_timezone_offset(
+ parsed_lines: list[tuple[float, pynmea2.NMEASentence]],
+) -> float:
+ """
+ Detect timezone offset between camera clock and GPS time.
+
+ Tries RMC messages first (most reliable - has full date+time),
+ then falls back to GGA/GLL (less reliable - time only, no date).
+ Returns 0.0 if no offset could be determined.
+ """
+ first_valid_gga_gll: tuple[float, pynmea2.NMEASentence] | None = None
+
+ for epoch_sec, message in parsed_lines:
+ if message.sentence_type == "RMC":
+ if hasattr(message, "is_valid") and message.is_valid:
+ offset = _compute_timezone_offset_from_rmc(epoch_sec, message)
+ if offset is not None:
+ LOG.debug(
+ "Computed timezone offset %.1fs from RMC (%s %s)",
+ offset,
+ message.datestamp,
+ message.timestamp,
+ )
+ return offset
+
+ if first_valid_gga_gll is None and message.sentence_type in ["GGA", "GLL"]:
+ if hasattr(message, "is_valid") and message.is_valid:
+ first_valid_gga_gll = (epoch_sec, message)
+
+ # Fallback: if no RMC found, try GGA/GLL (less reliable - no date info)
+ if first_valid_gga_gll is not None:
+ epoch_sec, message = first_valid_gga_gll
+ offset = _compute_timezone_offset_from_time_only(epoch_sec, message)
+ if offset is not None:
+ LOG.debug(
+ "Computed timezone offset %.1fs from %s (fallback, no date info)",
+ offset,
+ message.sentence_type,
+ )
+ return offset
+
+ return 0.0
+
+
def _parse_gps_box(gps_data: bytes) -> list[telemetry.GPSPoint]:
"""
>>> list(_parse_gps_box(b"[1623057074211]$GPGGA,202530.00,5109.0262,N,11401.8407,W,5,40,0.5,1097.36,M,-17.00,M,18,TSTR*61"))
@@ -210,83 +254,47 @@ def _parse_gps_box(gps_data: bytes) -> list[telemetry.GPSPoint]:
>>> list(_parse_gps_box(b"[1623057074211]$GPVTG,,T,,M,0.078,N,0.144,K,D*28[1623057075215]"))
[]
"""
- timezone_offset: float | None = None
parsed_lines: list[tuple[float, pynmea2.NMEASentence]] = []
- first_valid_gga_gll: tuple[float, pynmea2.NMEASentence] | None = None
- # First pass: collect parsed_lines and compute timezone offset from the first valid RMC message
+ # First pass: collect parsed_lines
for epoch_ms, message in _parse_nmea_lines(gps_data):
# Rounding needed to avoid floating point precision issues
epoch_sec = round(epoch_ms / 1000, 3)
parsed_lines.append((epoch_sec, message))
- if timezone_offset is None and message.sentence_type == "RMC":
- if hasattr(message, "is_valid") and message.is_valid:
- timezone_offset = _compute_timezone_offset_from_rmc(epoch_sec, message)
- if timezone_offset is not None:
- LOG.debug(
- "Computed timezone offset %.1fs from RMC (%s %s)",
- timezone_offset,
- message.datestamp,
- message.timestamp,
- )
- # Track first valid GGA/GLL for fallback
- if first_valid_gga_gll is None and message.sentence_type in ["GGA", "GLL"]:
- if hasattr(message, "is_valid") and message.is_valid:
- first_valid_gga_gll = (epoch_sec, message)
-
- # Fallback: if no RMC found, try GGA/GLL (less reliable - no date info)
- if timezone_offset is None and first_valid_gga_gll is not None:
- epoch_sec, message = first_valid_gga_gll
- timezone_offset = _compute_timezone_offset_from_time_only(epoch_sec, message)
- if timezone_offset is not None:
- LOG.debug(
- "Computed timezone offset %.1fs from %s (fallback, no date info)",
- timezone_offset,
- message.sentence_type,
- )
- # If no offset could be determined, use 0 (camera clock assumed correct)
- if timezone_offset is None:
- timezone_offset = 0.0
+ timezone_offset = _detect_timezone_offset(parsed_lines)
points_by_sentence_type: dict[str, list[telemetry.GPSPoint]] = {}
# Second pass: apply offset to all GPS points
for epoch_sec, message in parsed_lines:
+ if message.sentence_type not in ("GGA", "RMC", "GLL"):
+ continue
+ if not message.is_valid:
+ continue
+
corrected_epoch = round(epoch_sec + timezone_offset, 3)
- # https://tavotech.com/gps-nmea-sentence-structure/
- if message.sentence_type in ["GGA"]:
- if not message.is_valid:
- continue
- point = telemetry.GPSPoint(
- time=corrected_epoch,
- lat=message.latitude,
- lon=message.longitude,
- alt=message.altitude,
- angle=None,
- epoch_time=corrected_epoch,
- fix=telemetry.GPSFix.FIX_3D if message.gps_qual >= 1 else None,
- precision=None,
- ground_speed=None,
- )
- points_by_sentence_type.setdefault(message.sentence_type, []).append(point)
-
- elif message.sentence_type in ["RMC", "GLL"]:
- if not message.is_valid:
- continue
- point = telemetry.GPSPoint(
- time=corrected_epoch,
- lat=message.latitude,
- lon=message.longitude,
- alt=None,
- angle=None,
- epoch_time=corrected_epoch,
- fix=None,
- precision=None,
- ground_speed=None,
- )
- points_by_sentence_type.setdefault(message.sentence_type, []).append(point)
+ # GGA has altitude and fix; RMC and GLL do not
+ if message.sentence_type == "GGA":
+ alt = message.altitude
+ fix = telemetry.GPSFix.FIX_3D if message.gps_qual >= 1 else None
+ else:
+ alt = None
+ fix = None
+
+ point = telemetry.GPSPoint(
+ time=corrected_epoch,
+ lat=message.latitude,
+ lon=message.longitude,
+ alt=alt,
+ angle=None,
+ epoch_time=corrected_epoch,
+ fix=fix,
+ precision=None,
+ ground_speed=None,
+ )
+ points_by_sentence_type.setdefault(message.sentence_type, []).append(point)
# This is the extraction order in exiftool
if "RMC" in points_by_sentence_type:
diff --git a/mapillary_tools/exiftool_read_video.py b/mapillary_tools/exiftool_read_video.py
index a5c2fcc0..66d0d0ad 100644
--- a/mapillary_tools/exiftool_read_video.py
+++ b/mapillary_tools/exiftool_read_video.py
@@ -120,6 +120,86 @@ def _deduplicate_gps_points(
return deduplicated_track
+def _aggregate_float_values_same_length(
+ texts_by_tag: dict[str, list[str]],
+ tag: str | None,
+ expected_length: int,
+) -> list[float | None]:
+ if tag is not None:
+ vals = [
+ _maybe_float(val)
+ for val in _extract_alternative_fields(texts_by_tag, [tag], list) or []
+ ]
+ else:
+ vals = []
+ while len(vals) < expected_length:
+ vals.append(None)
+ return vals
+
+
+def _aggregate_epoch_times(
+ texts_by_tag: dict[str, list[str]],
+ gps_time_tag: str | None,
+ time_tag: str | None,
+ timestamps: list[float | None],
+ expected_length: int,
+) -> list[float | None]:
+ """Aggregate GPS epoch times from tags, with fallback to per-point timestamps."""
+ if gps_time_tag is not None:
+ gps_epoch_times: list[float | None] = [
+ geo.as_unix_time(dt) if dt is not None else None
+ for dt in (
+ exif_read.parse_gps_datetime(text)
+ for text in _extract_alternative_fields(
+ texts_by_tag, [gps_time_tag], list
+ )
+ or []
+ )
+ ]
+ if len(gps_epoch_times) != expected_length:
+ LOG.warning(
+ "Found different number of GPS epoch times %d and coordinates %d",
+ len(gps_epoch_times),
+ expected_length,
+ )
+ gps_epoch_times = [None] * expected_length
+ return gps_epoch_times
+ elif time_tag is not None:
+ # Use per-point GPS timestamps as epoch times
+ return [t for t in timestamps]
+ else:
+ return [None] * expected_length
+
+
+def _aggregate_timestamps(
+ texts_by_tag: dict[str, list[str]],
+ time_tag: str | None,
+ expected_length: int,
+) -> list[float | None] | None:
+ """Aggregate timestamps from the time tag.
+
+ Returns the timestamp list, or None if the lengths don't match
+ (caller should return [] in that case).
+ """
+ if time_tag is not None:
+ dts = [
+ exif_read.parse_gps_datetime(text)
+ for text in _extract_alternative_fields(texts_by_tag, [time_tag], list)
+ or []
+ ]
+ timestamps = [geo.as_unix_time(dt) if dt is not None else None for dt in dts]
+ if expected_length != len(timestamps):
+ LOG.warning(
+ "Found different number of timestamps %d and coordinates %d",
+ len(timestamps),
+ expected_length,
+ )
+ return None
+ else:
+ timestamps = [0.0] * expected_length
+ return timestamps
+
+
def _aggregate_gps_track(
texts_by_tag: dict[str, list[str]],
time_tag: str | None,
@@ -159,73 +239,29 @@ def _aggregate_gps_track(
expected_length = len(lats)
# aggregate timestamps (optional)
- if time_tag is not None:
- dts = [
- exif_read.parse_gps_datetime(text)
- for text in _extract_alternative_fields(texts_by_tag, [time_tag], list)
- or []
- ]
- timestamps = [geo.as_unix_time(dt) if dt is not None else None for dt in dts]
- if expected_length != len(timestamps):
- # no idea what to do if we have different number of timestamps and coordinates
- LOG.warning(
- "Found different number of timestamps %d and coordinates %d",
- len(timestamps),
- expected_length,
- )
- return []
- else:
- timestamps = [0.0] * expected_length
+ timestamps = _aggregate_timestamps(texts_by_tag, time_tag, expected_length)
+ if timestamps is None:
+ return []
assert len(timestamps) == expected_length
- def _aggregate_float_values_same_length(
- tag: str | None,
- ) -> list[float | None]:
- if tag is not None:
- vals = [
- _maybe_float(val)
- for val in _extract_alternative_fields(texts_by_tag, [tag], list) or []
- ]
- else:
- vals = []
- while len(vals) < expected_length:
- vals.append(None)
- return vals
-
# aggregate altitudes (optional)
- alts = _aggregate_float_values_same_length(alt_tag)
+ alts = _aggregate_float_values_same_length(texts_by_tag, alt_tag, expected_length)
# aggregate directions (optional)
- directions = _aggregate_float_values_same_length(direction_tag)
+ directions = _aggregate_float_values_same_length(
+ texts_by_tag, direction_tag, expected_length
+ )
# aggregate speeds (optional)
- ground_speeds = _aggregate_float_values_same_length(ground_speed_tag)
+ ground_speeds = _aggregate_float_values_same_length(
+ texts_by_tag, ground_speed_tag, expected_length
+ )
# GPS epoch times (optional)
- if gps_time_tag is not None:
- gps_epoch_times: list[float | None] = [
- geo.as_unix_time(dt) if dt is not None else None
- for dt in (
- exif_read.parse_gps_datetime(text)
- for text in _extract_alternative_fields(
- texts_by_tag, [gps_time_tag], list
- )
- or []
- )
- ]
- if len(gps_epoch_times) != expected_length:
- LOG.warning(
- "Found different number of GPS epoch times %d and coordinates %d",
- len(gps_epoch_times),
- expected_length,
- )
- gps_epoch_times = [None] * expected_length
- elif time_tag is not None:
- # Use per-point GPS timestamps as epoch times
- gps_epoch_times = [t for t in timestamps]
- else:
- gps_epoch_times = [None] * expected_length
+ gps_epoch_times = _aggregate_epoch_times(
+ texts_by_tag, gps_time_tag, time_tag, timestamps, expected_length
+ )
# build track
track: list[GPSPoint] = []
diff --git a/mapillary_tools/gpmf/gpmf_parser.py b/mapillary_tools/gpmf/gpmf_parser.py
index 1ba3c8db..e82329b7 100644
--- a/mapillary_tools/gpmf/gpmf_parser.py
+++ b/mapillary_tools/gpmf/gpmf_parser.py
@@ -561,6 +561,37 @@ def _find_first_telemetry_stream(stream: T.Sequence[KLVDict], key: bytes):
return values
+_XYZDataT = T.TypeVar(
+ "_XYZDataT",
+ telemetry.AccelerationData,
+ telemetry.GyroscopeData,
+ telemetry.MagnetometerData,
+)
+
+
+def _accumulate_xyz_telemetry(
+ device_data: T.Sequence[KLVDict],
+ sample: Sample,
+ stream_key: bytes,
+ data_class: T.Type[_XYZDataT],
+ output_dict: dict[int, list[_XYZDataT]],
+ device_id: int,
+) -> None:
+ """Extract XYZ telemetry (ACCL/GYRO/MAGN) from a device and accumulate into output_dict."""
+ samples = _find_first_telemetry_stream(device_data, stream_key)
+ if samples:
+ avg_delta = sample.exact_timedelta / len(samples)
+ output_dict.setdefault(device_id, []).extend(
+ data_class(
+ time=sample.exact_time + avg_delta * idx,
+ x=x,
+ y=y,
+ z=z,
+ )
+ for idx, (z, x, y, *_) in enumerate(samples)
+ )
+
+
def _backfill_gps_timestamps(gps_points: T.Iterable[telemetry.GPSPoint]) -> None:
it = iter(gps_points)
@@ -626,49 +657,34 @@ def _load_telemetry_from_samples(
device_points.extend(sample_points)
if accls_by_dvid is not None:
- sample_accls = _find_first_telemetry_stream(device["data"], b"ACCL")
- if sample_accls:
- # interpolate timestamps in between
- avg_delta = sample.exact_timedelta / len(sample_accls)
- accls_by_dvid.setdefault(device_id, []).extend(
- telemetry.AccelerationData(
- time=sample.exact_time + avg_delta * idx,
- x=x,
- y=y,
- z=z,
- )
- for idx, (z, x, y, *_) in enumerate(sample_accls)
- )
+ _accumulate_xyz_telemetry(
+ device["data"],
+ sample,
+ b"ACCL",
+ telemetry.AccelerationData,
+ accls_by_dvid,
+ device_id,
+ )
if gyros_by_dvid is not None:
- sample_gyros = _find_first_telemetry_stream(device["data"], b"GYRO")
- if sample_gyros:
- # interpolate timestamps in between
- avg_delta = sample.exact_timedelta / len(sample_gyros)
- gyros_by_dvid.setdefault(device_id, []).extend(
- telemetry.GyroscopeData(
- time=sample.exact_time + avg_delta * idx,
- x=x,
- y=y,
- z=z,
- )
- for idx, (z, x, y, *_) in enumerate(sample_gyros)
- )
+ _accumulate_xyz_telemetry(
+ device["data"],
+ sample,
+ b"GYRO",
+ telemetry.GyroscopeData,
+ gyros_by_dvid,
+ device_id,
+ )
if magns_by_dvid is not None:
- sample_magns = _find_first_telemetry_stream(device["data"], b"MAGN")
- if sample_magns:
- # interpolate timestamps in between
- avg_delta = sample.exact_timedelta / len(sample_magns)
- magns_by_dvid.setdefault(device_id, []).extend(
- telemetry.MagnetometerData(
- time=sample.exact_time + avg_delta * idx,
- x=x,
- y=y,
- z=z,
- )
- for idx, (z, x, y, *_) in enumerate(sample_magns)
- )
+ _accumulate_xyz_telemetry(
+ device["data"],
+ sample,
+ b"MAGN",
+ telemetry.MagnetometerData,
+ magns_by_dvid,
+ device_id,
+ )
return device_found
diff --git a/tests/unit/test_exiftool_read_video.py b/tests/unit/test_exiftool_read_video.py
new file mode 100644
index 00000000..ced1c502
--- /dev/null
+++ b/tests/unit/test_exiftool_read_video.py
@@ -0,0 +1,1340 @@
+# Copyright (c) Meta Platforms, Inc. and affiliates.
+#
+# This source code is licensed under the BSD license found in the
+# LICENSE file in the root directory of this source tree.
+
+from __future__ import annotations
+
+import xml.etree.ElementTree as ET
+
+import pytest
+
+from mapillary_tools.exiftool_read_video import (
+ _aggregate_gps_track,
+ _aggregate_gps_track_by_sample_time,
+ _aggregate_samples,
+ _deduplicate_gps_points,
+ _extract_alternative_fields,
+ _index_text_by_tag,
+ _same_gps_point,
+ ExifToolReadVideo,
+ expand_tag,
+)
+from mapillary_tools.telemetry import GPSFix, GPSPoint
+
+
+# ---------------------------------------------------------------------------
+# Helper: build an ElementTree from raw XML strings
+# ---------------------------------------------------------------------------
+
+
+def _etree_from_xml(xml_str: str) -> ET.ElementTree:
+ """Parse XML and return an ElementTree rooted at rdf:Description.
+
+ In production, ExifToolReadVideo receives an ElementTree whose root is the
+ rdf:Description element (not the enclosing rdf:RDF). This helper mimics
+ that behaviour: it parses the full XML, finds the first rdf:Description
+ child, and wraps it in an ElementTree.
+ """
+ rdf_root = ET.fromstring(xml_str)
+ rdf_ns = "http://www.w3.org/1999/02/22-rdf-syntax-ns#"
+ desc = rdf_root.find(f"{{{rdf_ns}}}Description")
+ if desc is None:
+ raise ValueError("No rdf:Description found in the XML")
+ return ET.ElementTree(desc)
+
+
+def _make_element(tag: str, text: str) -> ET.Element:
+ el = ET.Element(expand_tag(tag))
+ el.text = text
+ return el
+
+
+# ---------------------------------------------------------------------------
+# Real-world-style XML fixtures
+# ---------------------------------------------------------------------------
+
+BLACKVUE_XML = """\
+
+
+
+ BlackVue
+ DR900S-2CH
+ BV900S123456
+ 2019:09:02 10:23:28.00Z
+ 37.265547
+ 28.213497
+ 52.7561
+ 133.46
+ 402.9
+ 2019:09:02 10:23:29.00Z
+ 37.265461
+ 28.213611
+ 50.0466
+ 133.11
+ 402.8
+ 2019:09:02 10:23:30.00Z
+ 37.265378
+ 28.213722
+ 47.6205
+ 132.83
+ 402.8
+
+
+"""
+
+# GoPro Track-based GPS (Track namespace).
+# Note: SampleTime/SampleDuration are plain floats (exiftool XML numeric output).
+GOPRO_XML = """\
+
+
+
+ GoPro
+ GoPro Max
+ C3456789012345
+ 0
+ 1.001
+ 47.359832
+ 8.522706
+ 414.9
+ 2022:07:31 00:25:23.200Z
+ 5.376
+ 167.58
+ 3
+ 2.19
+ 1.001
+ 1.001
+ 47.359810
+ 8.522680
+ 415.2
+ 2022:07:31 00:25:24.200Z
+ 4.992
+ 168.23
+ 3
+ 2.15
+
+
+"""
+
+INSTA360_XML = """\
+
+
+
+ Insta360
+ Insta360 X3
+ ISN12345678
+ 2023:09:23 15:13:34.00Z
+ 47.371
+ 8.542
+ 408.5
+ 2023:09:23 15:13:35.00Z
+ 47.372
+ 8.543
+ 409.1
+
+
+"""
+
+
+# ---------------------------------------------------------------------------
+# _index_text_by_tag
+# ---------------------------------------------------------------------------
+
+
+class TestIndexTextByTag:
+ def test_basic_indexing(self):
+ elements = [
+ _make_element("QuickTime:GPSLatitude", "37.0"),
+ _make_element("QuickTime:GPSLongitude", "28.0"),
+ _make_element("QuickTime:GPSLatitude", "38.0"),
+ ]
+ result = _index_text_by_tag(elements)
+ lat_tag = expand_tag("QuickTime:GPSLatitude")
+ lon_tag = expand_tag("QuickTime:GPSLongitude")
+ assert result[lat_tag] == ["37.0", "38.0"]
+ assert result[lon_tag] == ["28.0"]
+
+ def test_empty_elements(self):
+ result = _index_text_by_tag([])
+ assert result == {}
+
+ def test_element_with_no_text_is_skipped(self):
+ el = ET.Element(expand_tag("QuickTime:GPSLatitude"))
+ # el.text is None by default
+ result = _index_text_by_tag([el])
+ assert result == {}
+
+ def test_mixed_text_and_none(self):
+ el1 = _make_element("QuickTime:GPSLatitude", "37.0")
+ el2 = ET.Element(expand_tag("QuickTime:GPSLatitude"))
+ el3 = _make_element("QuickTime:GPSLatitude", "38.0")
+ result = _index_text_by_tag([el1, el2, el3])
+ lat_tag = expand_tag("QuickTime:GPSLatitude")
+ # Only elements with text are included
+ assert result[lat_tag] == ["37.0", "38.0"]
+
+
+# ---------------------------------------------------------------------------
+# _extract_alternative_fields
+# ---------------------------------------------------------------------------
+
+
+class TestExtractAlternativeFields:
+ def test_extract_float_value(self):
+ texts = {expand_tag("QuickTime:GPSLatitude"): ["37.265547"]}
+ result = _extract_alternative_fields(texts, ["QuickTime:GPSLatitude"], float)
+ assert result == pytest.approx(37.265547)
+
+ def test_extract_int_value(self):
+ texts = {expand_tag("Track1:GPSMeasureMode"): ["3"]}
+ result = _extract_alternative_fields(texts, ["Track1:GPSMeasureMode"], int)
+ assert result == 3
+
+ def test_extract_str_value(self):
+ texts = {expand_tag("IFD0:Make"): ["BlackVue"]}
+ result = _extract_alternative_fields(texts, ["IFD0:Make"], str)
+ assert result == "BlackVue"
+
+ def test_extract_list_value(self):
+ texts = {expand_tag("QuickTime:GPSLatitude"): ["37.0", "38.0"]}
+ result = _extract_alternative_fields(texts, ["QuickTime:GPSLatitude"], list)
+ assert result == ["37.0", "38.0"]
+
+ def test_returns_none_for_missing_field(self):
+ texts = {expand_tag("QuickTime:GPSLatitude"): ["37.0"]}
+ result = _extract_alternative_fields(texts, ["QuickTime:GPSLongitude"], float)
+ assert result is None
+
+ def test_alternative_fallback(self):
+ """First field is missing, second is present."""
+ texts = {expand_tag("UserData:Make"): ["TestMake"]}
+ result = _extract_alternative_fields(texts, ["IFD0:Make", "UserData:Make"], str)
+ assert result == "TestMake"
+
+ def test_first_alternative_preferred(self):
+ """When both fields are present, first is used."""
+ texts = {
+ expand_tag("IFD0:Make"): ["First"],
+ expand_tag("UserData:Make"): ["Second"],
+ }
+ result = _extract_alternative_fields(texts, ["IFD0:Make", "UserData:Make"], str)
+ assert result == "First"
+
+ def test_invalid_float_returns_none(self):
+ texts = {expand_tag("QuickTime:GPSLatitude"): ["not_a_number"]}
+ result = _extract_alternative_fields(texts, ["QuickTime:GPSLatitude"], float)
+ assert result is None
+
+ def test_invalid_int_returns_none(self):
+ texts = {expand_tag("Track1:GPSMeasureMode"): ["abc"]}
+ result = _extract_alternative_fields(texts, ["Track1:GPSMeasureMode"], int)
+ assert result is None
+
+ def test_all_fields_missing_returns_none(self):
+ result = _extract_alternative_fields({}, ["IFD0:Make", "UserData:Make"], str)
+ assert result is None
+
+ def test_invalid_field_type_raises(self):
+ texts = {expand_tag("IFD0:Make"): ["val"]}
+ with pytest.raises(ValueError, match="Invalid field type"):
+ _extract_alternative_fields(texts, ["IFD0:Make"], dict) # type: ignore
+
+
+# ---------------------------------------------------------------------------
+# _same_gps_point and _deduplicate_gps_points
+# ---------------------------------------------------------------------------
+
+
+class TestSameGpsPoint:
+ def _make_point(self, **overrides) -> GPSPoint:
+ defaults = dict(
+ time=0.0,
+ lat=37.0,
+ lon=28.0,
+ alt=400.0,
+ angle=133.0,
+ epoch_time=None,
+ fix=None,
+ precision=None,
+ ground_speed=None,
+ )
+ defaults.update(overrides)
+ return GPSPoint(**defaults)
+
+ def test_identical_points_are_same(self):
+ p = self._make_point()
+ assert _same_gps_point(p, p) is True
+
+ def test_different_alt_are_same(self):
+ """Altitude does not affect sameness."""
+ p1 = self._make_point(alt=400.0)
+ p2 = self._make_point(alt=500.0)
+ assert _same_gps_point(p1, p2) is True
+
+ def test_different_lat_are_not_same(self):
+ p1 = self._make_point(lat=37.0)
+ p2 = self._make_point(lat=38.0)
+ assert _same_gps_point(p1, p2) is False
+
+ def test_different_lon_are_not_same(self):
+ p1 = self._make_point(lon=28.0)
+ p2 = self._make_point(lon=29.0)
+ assert _same_gps_point(p1, p2) is False
+
+ def test_different_time_are_not_same(self):
+ p1 = self._make_point(time=0.0)
+ p2 = self._make_point(time=1.0)
+ assert _same_gps_point(p1, p2) is False
+
+ def test_different_angle_are_not_same(self):
+ p1 = self._make_point(angle=100.0)
+ p2 = self._make_point(angle=200.0)
+ assert _same_gps_point(p1, p2) is False
+
+ def test_different_epoch_time_are_not_same(self):
+ p1 = self._make_point(epoch_time=1000.0)
+ p2 = self._make_point(epoch_time=2000.0)
+ assert _same_gps_point(p1, p2) is False
+
+ def test_different_ground_speed_are_same(self):
+ """ground_speed is not checked by _same_gps_point."""
+ p1 = self._make_point(ground_speed=10.0)
+ p2 = self._make_point(ground_speed=50.0)
+ assert _same_gps_point(p1, p2) is True
+
+
+class TestDeduplicateGpsPoints:
+ def _make_point(self, **overrides) -> GPSPoint:
+ defaults = dict(
+ time=0.0,
+ lat=37.0,
+ lon=28.0,
+ alt=400.0,
+ angle=133.0,
+ epoch_time=None,
+ fix=None,
+ precision=None,
+ ground_speed=None,
+ )
+ defaults.update(overrides)
+ return GPSPoint(**defaults)
+
+ def test_empty_track(self):
+ assert _deduplicate_gps_points([], _same_gps_point) == []
+
+ def test_no_duplicates(self):
+ track = [
+ self._make_point(time=0.0, lat=37.0),
+ self._make_point(time=1.0, lat=37.1),
+ ]
+ result = _deduplicate_gps_points(track, _same_gps_point)
+ assert len(result) == 2
+
+ def test_consecutive_duplicates_removed(self):
+ p = self._make_point(time=0.0, lat=37.0)
+ track = [p, p, p]
+ result = _deduplicate_gps_points(track, _same_gps_point)
+ assert len(result) == 1
+
+ def test_non_consecutive_duplicates_kept(self):
+ p1 = self._make_point(time=0.0, lat=37.0)
+ p2 = self._make_point(time=1.0, lat=38.0)
+ p3 = self._make_point(time=0.0, lat=37.0) # same as p1 but not consecutive
+ track = [p1, p2, p3]
+ result = _deduplicate_gps_points(track, _same_gps_point)
+ assert len(result) == 3
+
+ def test_only_alt_differs_is_duplicate(self):
+ """Points differing only by altitude are considered same."""
+ p1 = self._make_point(alt=400.0)
+ p2 = self._make_point(alt=500.0)
+ result = _deduplicate_gps_points([p1, p2], _same_gps_point)
+ assert len(result) == 1
+
+
+# ---------------------------------------------------------------------------
+# _aggregate_gps_track
+# ---------------------------------------------------------------------------
+
+
+class TestAggregateGpsTrack:
+ def test_basic_quicktime_track(self):
+ texts = {
+ expand_tag("QuickTime:GPSLongitude"): [
+ "28.213497",
+ "28.213611",
+ "28.213722",
+ ],
+ expand_tag("QuickTime:GPSLatitude"): [
+ "37.265547",
+ "37.265461",
+ "37.265378",
+ ],
+ expand_tag("QuickTime:GPSDateTime"): [
+ "2019:09:02 10:23:28.00Z",
+ "2019:09:02 10:23:29.00Z",
+ "2019:09:02 10:23:30.00Z",
+ ],
+ expand_tag("QuickTime:GPSAltitude"): ["402.9", "402.8", "402.8"],
+ expand_tag("QuickTime:GPSTrack"): ["133.46", "133.11", "132.83"],
+ }
+ track = _aggregate_gps_track(
+ texts,
+ time_tag="QuickTime:GPSDateTime",
+ lon_tag="QuickTime:GPSLongitude",
+ lat_tag="QuickTime:GPSLatitude",
+ alt_tag="QuickTime:GPSAltitude",
+ direction_tag="QuickTime:GPSTrack",
+ gps_time_tag="QuickTime:GPSDateTime",
+ )
+ assert len(track) == 3
+ # Times should be normalized relative to first point
+ assert track[0].time == pytest.approx(0.0)
+ assert track[1].time == pytest.approx(1.0)
+ assert track[2].time == pytest.approx(2.0)
+ # Check coordinates
+ assert track[0].lat == pytest.approx(37.265547)
+ assert track[0].lon == pytest.approx(28.213497)
+ assert track[0].alt == pytest.approx(402.9)
+ assert track[0].angle == pytest.approx(133.46)
+ # epoch_time should be set from gps_time_tag
+ for p in track:
+ assert p.epoch_time is not None
+
+ def test_mismatched_lon_lat_returns_empty(self):
+ texts = {
+ expand_tag("QuickTime:GPSLongitude"): ["28.0", "29.0"],
+ expand_tag("QuickTime:GPSLatitude"): ["37.0"],
+ }
+ track = _aggregate_gps_track(
+ texts,
+ time_tag=None,
+ lon_tag="QuickTime:GPSLongitude",
+ lat_tag="QuickTime:GPSLatitude",
+ )
+ assert track == []
+
+ def test_mismatched_timestamps_returns_empty(self):
+ texts = {
+ expand_tag("QuickTime:GPSLongitude"): ["28.0", "29.0"],
+ expand_tag("QuickTime:GPSLatitude"): ["37.0", "38.0"],
+ expand_tag("QuickTime:GPSDateTime"): [
+ "2019:09:02 10:23:28.00Z",
+ ],
+ }
+ track = _aggregate_gps_track(
+ texts,
+ time_tag="QuickTime:GPSDateTime",
+ lon_tag="QuickTime:GPSLongitude",
+ lat_tag="QuickTime:GPSLatitude",
+ )
+ assert track == []
+
+ def test_no_time_tag_uses_zero(self):
+ texts = {
+ expand_tag("QuickTime:GPSLongitude"): ["28.0", "29.0"],
+ expand_tag("QuickTime:GPSLatitude"): ["37.0", "38.0"],
+ }
+ track = _aggregate_gps_track(
+ texts,
+ time_tag=None,
+ lon_tag="QuickTime:GPSLongitude",
+ lat_tag="QuickTime:GPSLatitude",
+ )
+ # Without time_tag, all times are 0.0 but points differ in lat/lon
+ # Deduplication checks time+lat+lon so both should remain
+ assert len(track) == 2
+ for p in track:
+ assert p.time == 0.0
+ assert p.epoch_time is None
+
+ def test_empty_track(self):
+ track = _aggregate_gps_track(
+ {},
+ time_tag=None,
+ lon_tag="QuickTime:GPSLongitude",
+ lat_tag="QuickTime:GPSLatitude",
+ )
+ assert track == []
+
+ def test_none_lon_lat_values_skipped(self):
+ """If a value cannot be parsed as float, it is skipped."""
+ texts = {
+ expand_tag("QuickTime:GPSLongitude"): ["28.0", "invalid"],
+ expand_tag("QuickTime:GPSLatitude"): ["37.0", "38.0"],
+ expand_tag("QuickTime:GPSDateTime"): [
+ "2019:09:02 10:00:00Z",
+ "2019:09:02 10:00:01Z",
+ ],
+ }
+ track = _aggregate_gps_track(
+ texts,
+ time_tag="QuickTime:GPSDateTime",
+ lon_tag="QuickTime:GPSLongitude",
+ lat_tag="QuickTime:GPSLatitude",
+ )
+ # Second point has invalid lon so it gets skipped
+ assert len(track) == 1
+
+ def test_altitude_and_direction_shorter_padded_with_none(self):
+ """Optional arrays shorter than coord arrays are padded with None."""
+ texts = {
+ expand_tag("QuickTime:GPSLongitude"): ["28.0", "29.0"],
+ expand_tag("QuickTime:GPSLatitude"): ["37.0", "38.0"],
+ expand_tag("QuickTime:GPSAltitude"): ["400.0"], # only 1 for 2 coords
+ }
+ track = _aggregate_gps_track(
+ texts,
+ time_tag=None,
+ lon_tag="QuickTime:GPSLongitude",
+ lat_tag="QuickTime:GPSLatitude",
+ alt_tag="QuickTime:GPSAltitude",
+ )
+ assert len(track) == 2
+ assert track[0].alt == pytest.approx(400.0)
+ assert track[1].alt is None
+
+ def test_ground_speed_tag(self):
+ texts = {
+ expand_tag("QuickTime:GPSLongitude"): ["28.0"],
+ expand_tag("QuickTime:GPSLatitude"): ["37.0"],
+ expand_tag("QuickTime:GPSSpeed"): ["52.7561"],
+ }
+ track = _aggregate_gps_track(
+ texts,
+ time_tag=None,
+ lon_tag="QuickTime:GPSLongitude",
+ lat_tag="QuickTime:GPSLatitude",
+ ground_speed_tag="QuickTime:GPSSpeed",
+ )
+ assert len(track) == 1
+ assert track[0].ground_speed == pytest.approx(52.7561)
+
+ def test_gps_time_tag_length_mismatch_falls_back_to_none(self):
+ texts = {
+ expand_tag("QuickTime:GPSLongitude"): ["28.0", "29.0"],
+ expand_tag("QuickTime:GPSLatitude"): ["37.0", "38.0"],
+ expand_tag("QuickTime:GPSDateTime"): [
+ "2019:09:02 10:00:00Z",
+ "2019:09:02 10:00:01Z",
+ ],
+ expand_tag("QuickTime:GPSTimeStamp"): [
+ "2019:09:02 10:00:10Z",
+ ],
+ }
+ track = _aggregate_gps_track(
+ texts,
+ time_tag="QuickTime:GPSDateTime",
+ lon_tag="QuickTime:GPSLongitude",
+ lat_tag="QuickTime:GPSLatitude",
+ gps_time_tag="QuickTime:GPSTimeStamp",
+ )
+ assert len(track) == 2
+ # Mismatch in gps_time_tag length: epoch_time falls back to None
+ for p in track:
+ assert p.epoch_time is None
+
+ def test_track_is_sorted_by_time(self):
+ """Points with out-of-order timestamps are sorted."""
+ texts = {
+ expand_tag("QuickTime:GPSLongitude"): ["28.0", "29.0"],
+ expand_tag("QuickTime:GPSLatitude"): ["37.0", "38.0"],
+ expand_tag("QuickTime:GPSDateTime"): [
+ "2019:09:02 10:00:02Z",
+ "2019:09:02 10:00:00Z",
+ ],
+ }
+ track = _aggregate_gps_track(
+ texts,
+ time_tag="QuickTime:GPSDateTime",
+ lon_tag="QuickTime:GPSLongitude",
+ lat_tag="QuickTime:GPSLatitude",
+ )
+ assert len(track) == 2
+ assert track[0].time < track[1].time
+
+
+# ---------------------------------------------------------------------------
+# _aggregate_samples
+# ---------------------------------------------------------------------------
+
+
+class TestAggregateSamples:
+ def test_basic_two_samples(self):
+ elements = [
+ _make_element("Track1:SampleTime", "0"),
+ _make_element("Track1:SampleDuration", "1.001"),
+ _make_element("Track1:GPSLatitude", "47.0"),
+ _make_element("Track1:GPSLongitude", "8.0"),
+ _make_element("Track1:SampleTime", "1.001"),
+ _make_element("Track1:SampleDuration", "1.001"),
+ _make_element("Track1:GPSLatitude", "47.1"),
+ _make_element("Track1:GPSLongitude", "8.1"),
+ ]
+ samples = list(
+ _aggregate_samples(elements, "Track1:SampleTime", "Track1:SampleDuration")
+ )
+ assert len(samples) == 2
+ # First sample
+ sample_time, sample_dur, elems = samples[0]
+ assert sample_time == pytest.approx(0.0)
+ assert sample_dur == pytest.approx(1.001)
+ # Elements are GPS data without SampleTime/SampleDuration
+ assert len(elems) == 2
+ # Second sample
+ sample_time2, sample_dur2, elems2 = samples[1]
+ assert sample_time2 == pytest.approx(1.001)
+ assert sample_dur2 == pytest.approx(1.001)
+ assert len(elems2) == 2
+
+ def test_empty_elements(self):
+ samples = list(
+ _aggregate_samples([], "Track1:SampleTime", "Track1:SampleDuration")
+ )
+ assert samples == []
+
+ def test_sample_time_none_skips_sample(self):
+ """If sample time cannot be parsed as float, the sample is skipped."""
+ elements = [
+ _make_element("Track1:SampleTime", "not_a_number"),
+ _make_element("Track1:SampleDuration", "1.0"),
+ _make_element("Track1:GPSLatitude", "47.0"),
+ ]
+ samples = list(
+ _aggregate_samples(elements, "Track1:SampleTime", "Track1:SampleDuration")
+ )
+ # sample_time is None so the last yield won't happen for that group
+ assert len(samples) == 0
+
+ def test_missing_duration_skips_sample(self):
+ """If SampleDuration is missing, sample is not yielded."""
+ elements = [
+ _make_element("Track1:SampleTime", "0"),
+ # No SampleDuration element
+ _make_element("Track1:GPSLatitude", "47.0"),
+ ]
+ samples = list(
+ _aggregate_samples(elements, "Track1:SampleTime", "Track1:SampleDuration")
+ )
+ # sample_duration is None, so the sample is skipped
+ assert len(samples) == 0
+
+ def test_last_sample_is_yielded(self):
+ """The final sample (not followed by another SampleTime) is also yielded."""
+ elements = [
+ _make_element("Track1:SampleTime", "0"),
+ _make_element("Track1:SampleDuration", "2.0"),
+ _make_element("Track1:GPSLatitude", "47.0"),
+ ]
+ samples = list(
+ _aggregate_samples(elements, "Track1:SampleTime", "Track1:SampleDuration")
+ )
+ assert len(samples) == 1
+ sample_time, sample_dur, elems = samples[0]
+ assert sample_time == pytest.approx(0.0)
+ assert sample_dur == pytest.approx(2.0)
+ assert len(elems) == 1
+
+
+# ---------------------------------------------------------------------------
+# _aggregate_gps_track_by_sample_time
+# ---------------------------------------------------------------------------
+
+
+class TestAggregateGpsTrackBySampleTime:
+ def test_basic_sample_aggregation(self):
+ track_ns = "Track1"
+ elements = [
+ _make_element(f"{track_ns}:GPSLongitude", "8.522706"),
+ _make_element(f"{track_ns}:GPSLatitude", "47.359832"),
+ _make_element(f"{track_ns}:GPSAltitude", "414.9"),
+ _make_element(f"{track_ns}:GPSDateTime", "2022:07:31 00:25:23.200Z"),
+ _make_element(f"{track_ns}:GPSSpeed", "5.376"),
+ _make_element(f"{track_ns}:GPSTrack", "167.58"),
+ ]
+ sample_iterator = [(0.0, 1.001, elements)]
+ track = _aggregate_gps_track_by_sample_time(
+ sample_iterator,
+ lon_tag=f"{track_ns}:GPSLongitude",
+ lat_tag=f"{track_ns}:GPSLatitude",
+ alt_tag=f"{track_ns}:GPSAltitude",
+ gps_time_tag=f"{track_ns}:GPSDateTime",
+ direction_tag=f"{track_ns}:GPSTrack",
+ ground_speed_tag=f"{track_ns}:GPSSpeed",
+ )
+ assert len(track) == 1
+ assert track[0].lat == pytest.approx(47.359832)
+ assert track[0].lon == pytest.approx(8.522706)
+ assert track[0].alt == pytest.approx(414.9)
+ assert track[0].ground_speed == pytest.approx(5.376)
+ assert track[0].angle == pytest.approx(167.58)
+
+ def test_gps_fix_from_measure_mode(self):
+ track_ns = "Track1"
+ elements = [
+ _make_element(f"{track_ns}:GPSLongitude", "8.0"),
+ _make_element(f"{track_ns}:GPSLatitude", "47.0"),
+ _make_element(f"{track_ns}:GPSMeasureMode", "3"),
+ ]
+ sample_iterator = [(0.0, 1.0, elements)]
+ track = _aggregate_gps_track_by_sample_time(
+ sample_iterator,
+ lon_tag=f"{track_ns}:GPSLongitude",
+ lat_tag=f"{track_ns}:GPSLatitude",
+ gps_fix_tag=f"{track_ns}:GPSMeasureMode",
+ )
+ assert len(track) == 1
+ assert track[0].fix == GPSFix.FIX_3D
+
+ def test_gps_fix_2d(self):
+ track_ns = "Track1"
+ elements = [
+ _make_element(f"{track_ns}:GPSLongitude", "8.0"),
+ _make_element(f"{track_ns}:GPSLatitude", "47.0"),
+ _make_element(f"{track_ns}:GPSMeasureMode", "2"),
+ ]
+ sample_iterator = [(0.0, 1.0, elements)]
+ track = _aggregate_gps_track_by_sample_time(
+ sample_iterator,
+ lon_tag=f"{track_ns}:GPSLongitude",
+ lat_tag=f"{track_ns}:GPSLatitude",
+ gps_fix_tag=f"{track_ns}:GPSMeasureMode",
+ )
+ assert len(track) == 1
+ assert track[0].fix == GPSFix.FIX_2D
+
+ def test_gps_fix_no_fix(self):
+ track_ns = "Track1"
+ elements = [
+ _make_element(f"{track_ns}:GPSLongitude", "8.0"),
+ _make_element(f"{track_ns}:GPSLatitude", "47.0"),
+ _make_element(f"{track_ns}:GPSMeasureMode", "0"),
+ ]
+ sample_iterator = [(0.0, 1.0, elements)]
+ track = _aggregate_gps_track_by_sample_time(
+ sample_iterator,
+ lon_tag=f"{track_ns}:GPSLongitude",
+ lat_tag=f"{track_ns}:GPSLatitude",
+ gps_fix_tag=f"{track_ns}:GPSMeasureMode",
+ )
+ assert len(track) == 1
+ assert track[0].fix == GPSFix.NO_FIX
+
+ def test_invalid_gps_fix_is_none(self):
+ track_ns = "Track1"
+ elements = [
+ _make_element(f"{track_ns}:GPSLongitude", "8.0"),
+ _make_element(f"{track_ns}:GPSLatitude", "47.0"),
+ _make_element(f"{track_ns}:GPSMeasureMode", "99"),
+ ]
+ sample_iterator = [(0.0, 1.0, elements)]
+ track = _aggregate_gps_track_by_sample_time(
+ sample_iterator,
+ lon_tag=f"{track_ns}:GPSLongitude",
+ lat_tag=f"{track_ns}:GPSLatitude",
+ gps_fix_tag=f"{track_ns}:GPSMeasureMode",
+ )
+ assert len(track) == 1
+ assert track[0].fix is None
+
+ def test_gps_precision_scaled(self):
+ """GPS precision should be multiplied by 100 (meters to GPSP units)."""
+ track_ns = "Track1"
+ elements = [
+ _make_element(f"{track_ns}:GPSLongitude", "8.0"),
+ _make_element(f"{track_ns}:GPSLatitude", "47.0"),
+ _make_element(f"{track_ns}:GPSHPositioningError", "2.19"),
+ ]
+ sample_iterator = [(0.0, 1.0, elements)]
+ track = _aggregate_gps_track_by_sample_time(
+ sample_iterator,
+ lon_tag=f"{track_ns}:GPSLongitude",
+ lat_tag=f"{track_ns}:GPSLatitude",
+ gps_precision_tag=f"{track_ns}:GPSHPositioningError",
+ )
+ assert len(track) == 1
+ assert track[0].precision == pytest.approx(219.0)
+
+ def test_multiple_points_per_sample_get_interpolated_time(self):
+ """Multiple GPS points within a single sample get evenly spaced times."""
+ track_ns = "Track1"
+ elements = [
+ _make_element(f"{track_ns}:GPSLongitude", "8.0"),
+ _make_element(f"{track_ns}:GPSLatitude", "47.0"),
+ _make_element(f"{track_ns}:GPSLongitude", "8.1"),
+ _make_element(f"{track_ns}:GPSLatitude", "47.1"),
+ ]
+ sample_iterator = [(0.0, 2.0, elements)]
+ track = _aggregate_gps_track_by_sample_time(
+ sample_iterator,
+ lon_tag=f"{track_ns}:GPSLongitude",
+ lat_tag=f"{track_ns}:GPSLatitude",
+ )
+ assert len(track) == 2
+ assert track[0].time == pytest.approx(0.0)
+ assert track[1].time == pytest.approx(1.0)
+
+ def test_empty_sample_iterator(self):
+ track = _aggregate_gps_track_by_sample_time(
+ iter([]),
+ lon_tag="Track1:GPSLongitude",
+ lat_tag="Track1:GPSLatitude",
+ )
+ assert track == []
+
+ def test_sample_with_no_gps_data(self):
+ """A sample with no GPS elements produces no points."""
+ sample_iterator = [(0.0, 1.0, [])]
+ track = _aggregate_gps_track_by_sample_time(
+ sample_iterator,
+ lon_tag="Track1:GPSLongitude",
+ lat_tag="Track1:GPSLatitude",
+ )
+ assert track == []
+
+ def test_two_samples_sorted_by_time(self):
+ track_ns = "Track1"
+ # Sample 2 comes before sample 1 in input
+ elements1 = [
+ _make_element(f"{track_ns}:GPSLongitude", "8.0"),
+ _make_element(f"{track_ns}:GPSLatitude", "47.0"),
+ ]
+ elements2 = [
+ _make_element(f"{track_ns}:GPSLongitude", "8.1"),
+ _make_element(f"{track_ns}:GPSLatitude", "47.1"),
+ ]
+ sample_iterator = [
+ (2.0, 1.0, elements2),
+ (0.0, 1.0, elements1),
+ ]
+ track = _aggregate_gps_track_by_sample_time(
+ sample_iterator,
+ lon_tag=f"{track_ns}:GPSLongitude",
+ lat_tag=f"{track_ns}:GPSLatitude",
+ )
+ assert len(track) == 2
+ assert track[0].time < track[1].time
+
+
+# ---------------------------------------------------------------------------
+# ExifToolReadVideo.__init__
+# ---------------------------------------------------------------------------
+
+
+class TestExifToolReadVideoInit:
+ def test_init_from_blackvue_xml(self):
+ etree = _etree_from_xml(BLACKVUE_XML)
+ reader = ExifToolReadVideo(etree)
+ assert reader.etree is etree
+ # Internal state should be populated
+ assert len(reader._texts_by_tag) > 0
+ assert len(reader._all_tags) > 0
+
+ def test_init_from_gopro_xml(self):
+ etree = _etree_from_xml(GOPRO_XML)
+ reader = ExifToolReadVideo(etree)
+ assert reader.etree is etree
+
+ def test_init_from_insta360_xml(self):
+ etree = _etree_from_xml(INSTA360_XML)
+ reader = ExifToolReadVideo(etree)
+ assert reader.etree is etree
+
+
+# ---------------------------------------------------------------------------
+# ExifToolReadVideo.extract_make / extract_model / _extract_make_and_model
+# ---------------------------------------------------------------------------
+
+
+class TestExtractMakeAndModel:
+ def test_blackvue_make_and_model(self):
+ etree = _etree_from_xml(BLACKVUE_XML)
+ reader = ExifToolReadVideo(etree)
+ assert reader.extract_make() == "BlackVue"
+ assert reader.extract_model() == "DR900S-2CH"
+
+ def test_gopro_make_and_model(self):
+ etree = _etree_from_xml(GOPRO_XML)
+ reader = ExifToolReadVideo(etree)
+ assert reader.extract_make() == "GoPro"
+ assert reader.extract_model() == "GoPro Max"
+
+ def test_insta360_make_and_model(self):
+ etree = _etree_from_xml(INSTA360_XML)
+ reader = ExifToolReadVideo(etree)
+ assert reader.extract_make() == "Insta360"
+ assert reader.extract_model() == "Insta360 X3"
+
+ def test_gopro_make_defaults_when_missing(self):
+ """If GoPro:Model is present but GoPro:Make is missing, make defaults to 'GoPro'."""
+ xml = """\
+
+
+
+ HERO11
+
+
+"""
+ reader = ExifToolReadVideo(_etree_from_xml(xml))
+ assert reader.extract_make() == "GoPro"
+ assert reader.extract_model() == "HERO11"
+
+ def test_insta360_make_defaults_when_missing(self):
+ """If Insta360:Model is present but Insta360:Make is missing, make defaults to 'Insta360'."""
+ xml = """\
+
+
+
+ ONE RS
+
+
+"""
+ reader = ExifToolReadVideo(_etree_from_xml(xml))
+ assert reader.extract_make() == "Insta360"
+ assert reader.extract_model() == "ONE RS"
+
+ def test_no_make_no_model(self):
+ """When no make/model tags exist, both return None."""
+ xml = """\
+
+
+
+
+
+"""
+ reader = ExifToolReadVideo(_etree_from_xml(xml))
+ assert reader.extract_make() is None
+ assert reader.extract_model() is None
+
+ def test_make_with_whitespace_stripped(self):
+ xml = """\
+
+
+
+ SomeMake
+ SomeModel
+
+
+"""
+ reader = ExifToolReadVideo(_etree_from_xml(xml))
+ assert reader.extract_make() == "SomeMake"
+ assert reader.extract_model() == "SomeModel"
+
+ def test_gopro_takes_priority_over_ifd0(self):
+ """GoPro namespace is checked first, so it takes priority over IFD0."""
+ xml = """\
+
+
+
+ GoPro
+ HERO12
+ OtherMake
+ OtherModel
+
+
+"""
+ reader = ExifToolReadVideo(_etree_from_xml(xml))
+ assert reader.extract_make() == "GoPro"
+ assert reader.extract_model() == "HERO12"
+
+ def test_userdata_make_fallback(self):
+ """UserData:Make is used when IFD0:Make is not present."""
+ xml = """\
+
+
+
+ UserDataMake
+ UserDataModel
+
+
+"""
+ reader = ExifToolReadVideo(_etree_from_xml(xml))
+ assert reader.extract_make() == "UserDataMake"
+ assert reader.extract_model() == "UserDataModel"
+
+ def test_make_without_model(self):
+ """IFD0:Make present but no model tag anywhere returns (make, None)."""
+ xml = """\
+
+
+
+ JustMake
+
+
+"""
+ reader = ExifToolReadVideo(_etree_from_xml(xml))
+ assert reader.extract_make() == "JustMake"
+ assert reader.extract_model() is None
+
+
+# ---------------------------------------------------------------------------
+# ExifToolReadVideo.extract_camera_uuid
+# ---------------------------------------------------------------------------
+
+
+class TestExtractCameraUUID:
+ def test_blackvue_serial(self):
+ etree = _etree_from_xml(BLACKVUE_XML)
+ reader = ExifToolReadVideo(etree)
+ assert reader.extract_camera_uuid() == "BV900S123456"
+
+ def test_gopro_serial(self):
+ etree = _etree_from_xml(GOPRO_XML)
+ reader = ExifToolReadVideo(etree)
+ assert reader.extract_camera_uuid() == "C3456789012345"
+
+ def test_insta360_serial(self):
+ etree = _etree_from_xml(INSTA360_XML)
+ reader = ExifToolReadVideo(etree)
+ assert reader.extract_camera_uuid() == "ISN12345678"
+
+ def test_no_serial_returns_none(self):
+ xml = """\
+
+
+
+
+
+"""
+ reader = ExifToolReadVideo(_etree_from_xml(xml))
+ assert reader.extract_camera_uuid() is None
+
+ def test_body_and_lens_serial_combined(self):
+ xml = """\
+
+
+
+ BODY123
+ LENS456
+
+
+"""
+ reader = ExifToolReadVideo(_etree_from_xml(xml))
+ assert reader.extract_camera_uuid() == "BODY123_LENS456"
+
+ def test_serial_with_special_chars_sanitized(self):
+ """Serial numbers with non-alphanumeric characters are sanitized."""
+ xml = """\
+
+
+
+ SN-123_456
+
+
+"""
+ reader = ExifToolReadVideo(_etree_from_xml(xml))
+ # sanitize_serial removes non-alphanumeric chars
+ assert reader.extract_camera_uuid() == "SN123456"
+
+ def test_empty_serial_after_sanitization_returns_none(self):
+ """A serial that becomes empty after sanitization yields None."""
+ xml = """\
+
+
+
+ ---
+
+
+"""
+ reader = ExifToolReadVideo(_etree_from_xml(xml))
+ assert reader.extract_camera_uuid() is None
+
+ def test_gopro_serial_priority_over_generic(self):
+ """GoPro:SerialNumber is checked before generic ExifIFD:SerialNumber."""
+ xml = """\
+
+
+
+ GPSERIAL
+ EXIFSERIAL
+
+
+"""
+ reader = ExifToolReadVideo(_etree_from_xml(xml))
+ assert reader.extract_camera_uuid() == "GPSERIAL"
+
+ def test_dji_serial(self):
+ xml = """\
+
+
+
+ DJI123456
+
+
+"""
+ reader = ExifToolReadVideo(_etree_from_xml(xml))
+ assert reader.extract_camera_uuid() == "DJI123456"
+
+ def test_lens_serial_only(self):
+ """Only lens serial, no body serial."""
+ xml = """\
+
+
+
+ LENS789
+
+
+"""
+ reader = ExifToolReadVideo(_etree_from_xml(xml))
+ assert reader.extract_camera_uuid() == "LENS789"
+
+
+# ---------------------------------------------------------------------------
+# ExifToolReadVideo.extract_gps_track (integration)
+# ---------------------------------------------------------------------------
+
+
+class TestExtractGpsTrack:
+ def test_blackvue_quicktime_gps(self):
+ """BlackVue uses QuickTime namespace - the first path in extract_gps_track."""
+ etree = _etree_from_xml(BLACKVUE_XML)
+ reader = ExifToolReadVideo(etree)
+ track = reader.extract_gps_track()
+ assert len(track) == 3
+ # Time should be normalized (starts at 0)
+ assert track[0].time == pytest.approx(0.0)
+ assert track[1].time == pytest.approx(1.0)
+ assert track[2].time == pytest.approx(2.0)
+ # Check coordinates of first point
+ assert track[0].lat == pytest.approx(37.265547)
+ assert track[0].lon == pytest.approx(28.213497)
+ assert track[0].alt == pytest.approx(402.9)
+ assert track[0].angle == pytest.approx(133.46)
+
+ def test_insta360_gps(self):
+ """Insta360 uses Insta360 namespace - the second path in extract_gps_track."""
+ etree = _etree_from_xml(INSTA360_XML)
+ reader = ExifToolReadVideo(etree)
+ track = reader.extract_gps_track()
+ assert len(track) == 2
+ assert track[0].lat == pytest.approx(47.371)
+ assert track[0].lon == pytest.approx(8.542)
+ assert track[0].alt == pytest.approx(408.5)
+
+ def test_gopro_track_gps(self):
+ """GoPro uses Track namespace - the third path in extract_gps_track."""
+ etree = _etree_from_xml(GOPRO_XML)
+ reader = ExifToolReadVideo(etree)
+ track = reader.extract_gps_track()
+ assert len(track) == 2
+ assert track[0].lat == pytest.approx(47.359832)
+ assert track[0].lon == pytest.approx(8.522706)
+
+ def test_empty_gps_track(self):
+ """When no GPS data is present, returns empty list."""
+ xml = """\
+
+
+
+ Unknown
+
+
+"""
+ reader = ExifToolReadVideo(_etree_from_xml(xml))
+ track = reader.extract_gps_track()
+ assert track == []
+
+ def test_quicktime_preferred_over_track(self):
+ """If both QuickTime and Track GPS data exist, QuickTime is used."""
+ xml = """\
+
+
+
+ 2019:09:02 10:23:28.00Z
+ 37.0
+ 28.0
+ 0
+ 1.0
+ 47.0
+ 8.0
+
+
+"""
+ reader = ExifToolReadVideo(_etree_from_xml(xml))
+ track = reader.extract_gps_track()
+ # QuickTime is preferred, so lat should be ~37, not ~47
+ assert len(track) >= 1
+ assert track[0].lat == pytest.approx(37.0)
+
+
+# ---------------------------------------------------------------------------
+# ExifToolReadVideo._extract_gps_track_from_quicktime
+# ---------------------------------------------------------------------------
+
+
+class TestExtractGpsTrackFromQuicktime:
+ def test_missing_required_tags_returns_empty(self):
+ """Without all three required tags, returns empty list."""
+ xml = """\
+
+
+
+ 37.0
+ 28.0
+
+
+"""
+ reader = ExifToolReadVideo(_etree_from_xml(xml))
+ # GPSDateTime is missing, so _extract_gps_track_from_quicktime returns []
+ track = reader._extract_gps_track_from_quicktime()
+ assert track == []
+
+ def test_custom_namespace_insta360(self):
+ etree = _etree_from_xml(INSTA360_XML)
+ reader = ExifToolReadVideo(etree)
+ track = reader._extract_gps_track_from_quicktime(namespace="Insta360")
+ assert len(track) == 2
+ assert track[0].lat == pytest.approx(47.371)
+
+ def test_custom_namespace_not_present(self):
+ """Using a namespace that has no data returns empty."""
+ etree = _etree_from_xml(BLACKVUE_XML)
+ reader = ExifToolReadVideo(etree)
+ track = reader._extract_gps_track_from_quicktime(namespace="Insta360")
+ assert track == []
+
+
+# ---------------------------------------------------------------------------
+# ExifToolReadVideo._extract_gps_track_from_track
+# ---------------------------------------------------------------------------
+
+
+class TestExtractGpsTrackFromTrack:
+ def test_gopro_track_data(self):
+ etree = _etree_from_xml(GOPRO_XML)
+ reader = ExifToolReadVideo(etree)
+ track = reader._extract_gps_track_from_track()
+ assert len(track) == 2
+ assert track[0].lat == pytest.approx(47.359832)
+ assert track[0].lon == pytest.approx(8.522706)
+ assert track[0].alt == pytest.approx(414.9)
+ # GPSMeasureMode=3 -> FIX_3D
+ assert track[0].fix == GPSFix.FIX_3D
+ # GPSHPositioningError=2.19 -> 219.0 after *100
+ assert track[0].precision == pytest.approx(219.0)
+
+ def test_no_track_data_returns_empty(self):
+ etree = _etree_from_xml(BLACKVUE_XML)
+ reader = ExifToolReadVideo(etree)
+ # BlackVue doesn't have Track namespace data
+ track = reader._extract_gps_track_from_track()
+ assert track == []
+
+ def test_track2_namespace(self):
+ """GPS data in Track2 (not Track1) is still found."""
+ xml = """\
+
+
+
+ 0
+ 1.0
+ 47.0
+ 8.0
+
+
+"""
+ reader = ExifToolReadVideo(_etree_from_xml(xml))
+ track = reader._extract_gps_track_from_track()
+ assert len(track) == 1
+ assert track[0].lat == pytest.approx(47.0)
+
+ def test_track_with_incomplete_tags_skipped(self):
+ """Track namespace missing SampleTime/SampleDuration is skipped."""
+ xml = """\
+
+
+
+ 47.0
+ 8.0
+
+
+"""
+ reader = ExifToolReadVideo(_etree_from_xml(xml))
+ track = reader._extract_gps_track_from_track()
+ assert track == []
+
+
+# ---------------------------------------------------------------------------
+# Edge cases
+# ---------------------------------------------------------------------------
+
+
+class TestEdgeCases:
+ def test_single_gps_point(self):
+ """Single GPS point still works."""
+ xml = """\
+
+
+
+ 2019:09:02 10:23:28.00Z
+ 37.0
+ 28.0
+
+
+"""
+ reader = ExifToolReadVideo(_etree_from_xml(xml))
+ track = reader.extract_gps_track()
+ assert len(track) == 1
+ assert track[0].time == pytest.approx(0.0)
+
+ def test_duplicate_points_removed(self):
+ """Consecutive identical GPS points are deduplicated."""
+ xml = """\
+
+
+
+ 2019:09:02 10:23:28.00Z
+ 37.0
+ 28.0
+ 2019:09:02 10:23:28.00Z
+ 37.0
+ 28.0
+
+
+"""
+ reader = ExifToolReadVideo(_etree_from_xml(xml))
+ track = reader.extract_gps_track()
+ assert len(track) == 1
+
+ def test_rdf_description_with_no_children(self):
+ xml = """\
+
+
+
+
+
+"""
+ reader = ExifToolReadVideo(_etree_from_xml(xml))
+ assert reader.extract_gps_track() == []
+ assert reader.extract_make() is None
+ assert reader.extract_model() is None
+ assert reader.extract_camera_uuid() is None
diff --git a/tests/unit/test_gpmf_parser.py b/tests/unit/test_gpmf_parser.py
index ca372758..bae30ce1 100644
--- a/tests/unit/test_gpmf_parser.py
+++ b/tests/unit/test_gpmf_parser.py
@@ -3,11 +3,1152 @@
# This source code is licensed under the BSD license found in the
# LICENSE file in the root directory of this source tree.
+import datetime
+import os
+from pathlib import Path
+
+import pytest
+
+from mapillary_tools import telemetry
from mapillary_tools.gpmf import gpmf_parser
-def test_simple():
- x = gpmf_parser.KLV.parse(b"DEMO\x02\x01\x00\x01\xff\x00\x00\x00")
- x = gpmf_parser.GPMFSampleData.parse(
- b"DEM1\x01\x01\x00\x01\xff\x00\x00\x00DEM2\x03\x00\x00\x01"
- )
+# ---------------------------------------------------------------------------
+# Test data file paths (used for integration-style tests)
+# ---------------------------------------------------------------------------
+GPS5_VIDEO = Path(
+ "/tmp/mly_coverage_test/data/mly_test_data/GoPro MAX (works)/video 24 FPS/GS011498.360"
+)
+GPS9_VIDEO = Path(
+ "/tmp/mly_coverage_test/data/mly_test_data/GoPro MAX 2 (works)/360 Video (including lower res)/GS015637.360"
+)
+HERO7_VIDEO = Path(
+ "/tmp/mly_coverage_test/data/mly_test_data/GoPro Hero 7 (works)/GH010359.MP4"
+)
+
+_has_gps5_video = GPS5_VIDEO.exists()
+_has_gps9_video = GPS9_VIDEO.exists()
+_has_hero7_video = HERO7_VIDEO.exists()
+
+
+# ---------------------------------------------------------------------------
+# Helpers to construct realistic KLVDict structures
+# ---------------------------------------------------------------------------
+def _make_klv(
+ key: bytes, type_char: bytes, data, structure_size: int = 0, repeat: int = 0
+):
+ """Build a minimal KLVDict-like dict with the fields the parser uses."""
+ return {
+ "key": key,
+ "type": type_char,
+ "structure_size": structure_size,
+ "repeat": repeat,
+ "data": data,
+ }
+
+
+# ---------------------------------------------------------------------------
+# 1. _gps5_timestamp_to_epoch_time
+# ---------------------------------------------------------------------------
+class TestGps5TimestampToEpochTime:
+ def test_known_timestamp(self):
+ # GPSU from the GoPro MAX file: '230117115504.225' means 2023-01-17 11:55:04.225 UTC
+ epoch = gpmf_parser._gps5_timestamp_to_epoch_time("230117115504.225")
+ dt = datetime.datetime(
+ 2023, 1, 17, 11, 55, 4, 225000, tzinfo=datetime.timezone.utc
+ )
+ assert epoch == pytest.approx(dt.timestamp(), abs=0.001)
+
+ def test_midnight_timestamp(self):
+ epoch = gpmf_parser._gps5_timestamp_to_epoch_time("230101000000.000")
+ dt = datetime.datetime(2023, 1, 1, 0, 0, 0, 0, tzinfo=datetime.timezone.utc)
+ assert epoch == pytest.approx(dt.timestamp(), abs=0.001)
+
+ def test_end_of_year(self):
+ epoch = gpmf_parser._gps5_timestamp_to_epoch_time("231231235959.999")
+ dt = datetime.datetime(
+ 2023, 12, 31, 23, 59, 59, 999000, tzinfo=datetime.timezone.utc
+ )
+ assert epoch == pytest.approx(dt.timestamp(), abs=0.001)
+
+
+# ---------------------------------------------------------------------------
+# 2. _gps9_timestamp_to_epoch_time
+# ---------------------------------------------------------------------------
+class TestGps9TimestampToEpochTime:
+ def test_known_values_from_real_data(self):
+ # days_since_2000=9463, secs_since_midnight=44896.0 -> 2025-11-28 12:28:16 UTC
+ epoch = gpmf_parser._gps9_timestamp_to_epoch_time(9463, 44896.0)
+ assert epoch == pytest.approx(1764332896.0, abs=1.0)
+
+ def test_epoch_at_2000(self):
+ # 0 days, 0 secs -> 2000-01-01T00:00:00 UTC
+ epoch = gpmf_parser._gps9_timestamp_to_epoch_time(0, 0.0)
+ dt_2000 = datetime.datetime(2000, 1, 1, tzinfo=datetime.timezone.utc)
+ assert epoch == pytest.approx(dt_2000.timestamp(), abs=0.001)
+
+ def test_one_day_after_2000(self):
+ epoch = gpmf_parser._gps9_timestamp_to_epoch_time(1, 0.0)
+ dt = datetime.datetime(2000, 1, 2, tzinfo=datetime.timezone.utc)
+ assert epoch == pytest.approx(dt.timestamp(), abs=0.001)
+
+ def test_fractional_seconds(self):
+ # 0 days, 3600.5 secs -> 2000-01-01T01:00:00.5 UTC
+ epoch = gpmf_parser._gps9_timestamp_to_epoch_time(0, 3600.5)
+ dt = datetime.datetime(
+ 2000, 1, 1, 1, 0, 0, 500000, tzinfo=datetime.timezone.utc
+ )
+ assert epoch == pytest.approx(dt.timestamp(), abs=0.001)
+
+
+# ---------------------------------------------------------------------------
+# 3. _get_gps_type
+# ---------------------------------------------------------------------------
+class TestGetGpsType:
+ def test_flat_bytes_list(self):
+ # TYPE data from real GPS9 stream: [b'lllllllSS']
+ result = gpmf_parser._get_gps_type([b"lllllllSS"])
+ assert result == b"lllllllSS"
+
+ def test_nested_bytes(self):
+ # e.g., [b'll', [b'SS', b'bb']]
+ result = gpmf_parser._get_gps_type([b"ll", [b"SS", b"bb"]])
+ assert result == b"llSSbb"
+
+ def test_empty_input(self):
+ result = gpmf_parser._get_gps_type([])
+ assert result == b""
+
+ def test_none_input(self):
+ result = gpmf_parser._get_gps_type(None)
+ assert result == b""
+
+ def test_unexpected_type_raises(self):
+ with pytest.raises(ValueError, match="Unexpected type"):
+ gpmf_parser._get_gps_type([123])
+
+ def test_deeply_nested(self):
+ result = gpmf_parser._get_gps_type([[b"a", [b"b"]]])
+ assert result == b"ab"
+
+
+# ---------------------------------------------------------------------------
+# 4. _gps5_from_stream - unit tests using constructed KLVDict data
+# ---------------------------------------------------------------------------
+class TestGps5FromStream:
+ def _make_gps5_stream(
+ self,
+ gps5_data=None,
+ scal_data=None,
+ gpsf_data=None,
+ gpsu_data=None,
+ gpsp_data=None,
+ ):
+ """Build a GPS5 STRM stream with realistic structure."""
+ stream = []
+ if gpsf_data is not None:
+ stream.append(_make_klv(b"GPSF", b"L", gpsf_data))
+ if gpsu_data is not None:
+ stream.append(_make_klv(b"GPSU", b"U", gpsu_data))
+ if gpsp_data is not None:
+ stream.append(_make_klv(b"GPSP", b"S", gpsp_data))
+ if scal_data is not None:
+ stream.append(_make_klv(b"SCAL", b"l", scal_data))
+ if gps5_data is not None:
+ stream.append(_make_klv(b"GPS5", b"l", gps5_data))
+ return stream
+
+ def test_basic_gps5_parsing(self):
+ """Parse two GPS5 points from the GoPro MAX real data."""
+ # Real values from the GoPro MAX file
+ stream = self._make_gps5_stream(
+ gps5_data=[
+ [473598318, 85227055, 414870, 891, 119],
+ [473598322, 85227045, 414950, 1064, 95],
+ ],
+ scal_data=[[10000000], [10000000], [1000], [1000], [100]],
+ gpsf_data=[[3]],
+ gpsu_data=[[b"230117115504.225"]],
+ gpsp_data=[[219]],
+ )
+ points = list(gpmf_parser._gps5_from_stream(stream))
+ assert len(points) == 2
+
+ p0 = points[0]
+ assert p0.lat == pytest.approx(47.3598318, abs=1e-7)
+ assert p0.lon == pytest.approx(8.5227055, abs=1e-7)
+ assert p0.alt == pytest.approx(414.87, abs=0.01)
+ assert p0.ground_speed == pytest.approx(0.891, abs=0.001)
+ assert p0.fix == telemetry.GPSFix.FIX_3D
+ assert p0.precision == 219
+ assert p0.epoch_time is not None
+ assert p0.time == 0 # time is always 0 from _gps5_from_stream
+ assert p0.angle is None
+
+ p1 = points[1]
+ assert p1.lat == pytest.approx(47.3598322, abs=1e-7)
+ assert p1.lon == pytest.approx(8.5227045, abs=1e-7)
+
+ def test_no_gps5_key(self):
+ """Stream without GPS5 key yields nothing."""
+ stream = self._make_gps5_stream(
+ scal_data=[[10000000], [10000000], [1000], [1000], [100]],
+ )
+ points = list(gpmf_parser._gps5_from_stream(stream))
+ assert points == []
+
+ def test_no_scal_key(self):
+ """Stream without SCAL key yields nothing."""
+ stream = self._make_gps5_stream(
+ gps5_data=[[473598318, 85227055, 414870, 891, 119]],
+ )
+ points = list(gpmf_parser._gps5_from_stream(stream))
+ assert points == []
+
+ def test_zero_scale_value(self):
+ """Zero in SCAL causes early return (avoids division by zero)."""
+ stream = self._make_gps5_stream(
+ gps5_data=[[473598318, 85227055, 414870, 891, 119]],
+ scal_data=[[10000000], [0], [1000], [1000], [100]],
+ )
+ points = list(gpmf_parser._gps5_from_stream(stream))
+ assert points == []
+
+ def test_no_gpsf(self):
+ """Missing GPSF -> fix is None."""
+ stream = self._make_gps5_stream(
+ gps5_data=[[473598318, 85227055, 414870, 891, 119]],
+ scal_data=[[10000000], [10000000], [1000], [1000], [100]],
+ )
+ points = list(gpmf_parser._gps5_from_stream(stream))
+ assert len(points) == 1
+ assert points[0].fix is None
+
+ def test_gpsf_no_lock(self):
+ """GPSF=0 means no lock."""
+ stream = self._make_gps5_stream(
+ gps5_data=[[473598318, 85227055, 414870, 891, 119]],
+ scal_data=[[10000000], [10000000], [1000], [1000], [100]],
+ gpsf_data=[[0]],
+ )
+ points = list(gpmf_parser._gps5_from_stream(stream))
+ assert len(points) == 1
+ assert points[0].fix == telemetry.GPSFix.NO_FIX
+
+ def test_gpsf_2d_lock(self):
+ """GPSF=2 means 2D lock."""
+ stream = self._make_gps5_stream(
+ gps5_data=[[473598318, 85227055, 414870, 891, 119]],
+ scal_data=[[10000000], [10000000], [1000], [1000], [100]],
+ gpsf_data=[[2]],
+ )
+ points = list(gpmf_parser._gps5_from_stream(stream))
+ assert points[0].fix == telemetry.GPSFix.FIX_2D
+
+ def test_no_gpsu(self):
+ """Missing GPSU -> epoch_time is None."""
+ stream = self._make_gps5_stream(
+ gps5_data=[[473598318, 85227055, 414870, 891, 119]],
+ scal_data=[[10000000], [10000000], [1000], [1000], [100]],
+ )
+ points = list(gpmf_parser._gps5_from_stream(stream))
+ assert len(points) == 1
+ assert points[0].epoch_time is None
+
+ def test_invalid_gpsu(self):
+ """Invalid GPSU string -> epoch_time is None (exception caught)."""
+ stream = self._make_gps5_stream(
+ gps5_data=[[473598318, 85227055, 414870, 891, 119]],
+ scal_data=[[10000000], [10000000], [1000], [1000], [100]],
+ gpsu_data=[[b"invalid_gpsu_str"]],
+ )
+ points = list(gpmf_parser._gps5_from_stream(stream))
+ assert len(points) == 1
+ assert points[0].epoch_time is None
+
+ def test_no_gpsp(self):
+ """Missing GPSP -> precision is None."""
+ stream = self._make_gps5_stream(
+ gps5_data=[[473598318, 85227055, 414870, 891, 119]],
+ scal_data=[[10000000], [10000000], [1000], [1000], [100]],
+ )
+ points = list(gpmf_parser._gps5_from_stream(stream))
+ assert len(points) == 1
+ assert points[0].precision is None
+
+ def test_epoch_time_same_for_all_points_in_sample(self):
+ """All points in the same GPS5 sample share the same epoch_time."""
+ stream = self._make_gps5_stream(
+ gps5_data=[
+ [473598318, 85227055, 414870, 891, 119],
+ [473598322, 85227045, 414950, 1064, 95],
+ [473598323, 85227036, 415033, 1139, 111],
+ ],
+ scal_data=[[10000000], [10000000], [1000], [1000], [100]],
+ gpsu_data=[[b"230117115504.225"]],
+ )
+ points = list(gpmf_parser._gps5_from_stream(stream))
+ assert len(points) == 3
+ # All should have the same epoch_time
+ assert points[0].epoch_time == points[1].epoch_time == points[2].epoch_time
+
+ def test_negative_coordinates(self):
+ """Negative GPS coordinates (southern/western hemisphere)."""
+ # Hero 7 data: lat=34.3985866, lon=-119.6986334 (negative raw lon)
+ stream = self._make_gps5_stream(
+ gps5_data=[[343985866, -1196986334, -34141, 313, 50]],
+ scal_data=[[10000000], [10000000], [1000], [1000], [100]],
+ gpsf_data=[[3]],
+ )
+ points = list(gpmf_parser._gps5_from_stream(stream))
+ assert len(points) == 1
+ assert points[0].lat == pytest.approx(34.3985866, abs=1e-7)
+ assert points[0].lon == pytest.approx(-119.6986334, abs=1e-7)
+ assert points[0].alt == pytest.approx(-34.141, abs=0.001)
+
+
+# ---------------------------------------------------------------------------
+# 5. _gps9_from_stream - unit tests
+# ---------------------------------------------------------------------------
+class TestGps9FromStream:
+ def _make_gps9_stream(
+ self,
+ gps9_data=None,
+ scal_data=None,
+ type_data=None,
+ ):
+ """Build a GPS9 STRM stream with realistic structure."""
+ stream = []
+ if type_data is not None:
+ stream.append(_make_klv(b"TYPE", b"c", type_data))
+ if scal_data is not None:
+ stream.append(_make_klv(b"SCAL", b"l", scal_data))
+ if gps9_data is not None:
+ stream.append(_make_klv(b"GPS9", b"?", gps9_data))
+ return stream
+
+ def _build_gps9_sample_bytes(
+ self, lat, lon, alt, speed2d, speed3d, days, secs_ms, dop, fix
+ ):
+ """Encode raw GPS9 values as bytes using the 'lllllllSS' format."""
+ import struct
+
+ return struct.pack(
+ ">iiiiiiiHH",
+ lat,
+ lon,
+ alt,
+ speed2d,
+ speed3d,
+ days,
+ secs_ms,
+ dop,
+ fix,
+ )
+
+ def test_basic_gps9_parsing(self):
+ """Parse a GPS9 point from GoPro MAX2 real data."""
+ # Real bytes from the file
+ sample_bytes = bytes.fromhex(
+ "1e71d2c703b6242500014dce000001270000002a000024f702ad0f0000b90003"
+ )
+
+ stream = self._make_gps9_stream(
+ gps9_data=[sample_bytes],
+ scal_data=[
+ [10000000],
+ [10000000],
+ [1000],
+ [1000],
+ [100],
+ [1],
+ [1000],
+ [100],
+ [1],
+ ],
+ type_data=[b"lllllllSS"],
+ )
+ points = list(gpmf_parser._gps9_from_stream(stream))
+ assert len(points) == 1
+
+ p = points[0]
+ assert p.lat == pytest.approx(51.0776007, abs=1e-6)
+ assert p.lon == pytest.approx(6.2268453, abs=1e-6)
+ assert p.alt == pytest.approx(85.454, abs=0.01)
+ assert p.ground_speed == pytest.approx(0.295, abs=0.001)
+ assert p.fix == telemetry.GPSFix.FIX_3D
+ assert p.precision == pytest.approx(185.0, abs=0.1)
+ assert p.epoch_time == pytest.approx(1764332896.0, abs=1.0)
+ assert p.time == 0
+ assert p.angle is None
+
+ def test_no_gps9_key(self):
+ stream = self._make_gps9_stream(
+ scal_data=[
+ [10000000],
+ [10000000],
+ [1000],
+ [1000],
+ [100],
+ [1],
+ [1000],
+ [100],
+ [1],
+ ],
+ type_data=[b"lllllllSS"],
+ )
+ points = list(gpmf_parser._gps9_from_stream(stream))
+ assert points == []
+
+ def test_no_scal_key(self):
+ sample_bytes = self._build_gps9_sample_bytes(
+ 510776007, 62268453, 85454, 295, 42, 9463, 44896000, 185, 3
+ )
+ stream = self._make_gps9_stream(
+ gps9_data=[sample_bytes],
+ type_data=[b"lllllllSS"],
+ )
+ points = list(gpmf_parser._gps9_from_stream(stream))
+ assert points == []
+
+ def test_no_type_key(self):
+ sample_bytes = self._build_gps9_sample_bytes(
+ 510776007, 62268453, 85454, 295, 42, 9463, 44896000, 185, 3
+ )
+ stream = self._make_gps9_stream(
+ gps9_data=[sample_bytes],
+ scal_data=[
+ [10000000],
+ [10000000],
+ [1000],
+ [1000],
+ [100],
+ [1],
+ [1000],
+ [100],
+ [1],
+ ],
+ )
+ points = list(gpmf_parser._gps9_from_stream(stream))
+ assert points == []
+
+ def test_zero_scale_value(self):
+ sample_bytes = self._build_gps9_sample_bytes(
+ 510776007, 62268453, 85454, 295, 42, 9463, 44896000, 185, 3
+ )
+ stream = self._make_gps9_stream(
+ gps9_data=[sample_bytes],
+ scal_data=[[10000000], [0], [1000], [1000], [100], [1], [1000], [100], [1]],
+ type_data=[b"lllllllSS"],
+ )
+ points = list(gpmf_parser._gps9_from_stream(stream))
+ assert points == []
+
+ def test_wrong_type_length_raises(self):
+ sample_bytes = self._build_gps9_sample_bytes(
+ 510776007, 62268453, 85454, 295, 42, 9463, 44896000, 185, 3
+ )
+ stream = self._make_gps9_stream(
+ gps9_data=[sample_bytes],
+ scal_data=[
+ [10000000],
+ [10000000],
+ [1000],
+ [1000],
+ [100],
+ [1],
+ [1000],
+ [100],
+ [1],
+ ],
+ type_data=[b"llll"], # only 4 types instead of 9
+ )
+ with pytest.raises(ValueError, match="expect 9 types"):
+ list(gpmf_parser._gps9_from_stream(stream))
+
+ def test_multiple_gps9_samples(self):
+ """Multiple GPS9 samples in one stream."""
+ s1 = self._build_gps9_sample_bytes(
+ 510776007, 62268453, 85454, 295, 42, 9463, 44896000, 185, 3
+ )
+ s2 = self._build_gps9_sample_bytes(
+ 510776005, 62268463, 85505, 300, 45, 9463, 44897000, 190, 3
+ )
+ stream = self._make_gps9_stream(
+ gps9_data=[s1, s2],
+ scal_data=[
+ [10000000],
+ [10000000],
+ [1000],
+ [1000],
+ [100],
+ [1],
+ [1000],
+ [100],
+ [1],
+ ],
+ type_data=[b"lllllllSS"],
+ )
+ points = list(gpmf_parser._gps9_from_stream(stream))
+ assert len(points) == 2
+ # Each point should have its own epoch_time
+ assert points[0].epoch_time != points[1].epoch_time
+ assert points[1].epoch_time == pytest.approx(
+ gpmf_parser._gps9_timestamp_to_epoch_time(9463, 44897.0), abs=0.01
+ )
+
+
+# ---------------------------------------------------------------------------
+# 6. _find_first_device_id
+# ---------------------------------------------------------------------------
+class TestFindFirstDeviceId:
+ def test_dvid_present(self):
+ stream = [
+ _make_klv(b"DVID", b"L", [[1]]),
+ _make_klv(b"DVNM", b"c", [b"GoPro Max"]),
+ ]
+ assert gpmf_parser._find_first_device_id(stream) == 1
+
+ def test_dvid_large_value(self):
+ stream = [
+ _make_klv(b"DVID", b"L", [[4294967295]]),
+ ]
+ assert gpmf_parser._find_first_device_id(stream) == 4294967295
+
+ def test_no_dvid_returns_default(self):
+ stream = [
+ _make_klv(b"DVNM", b"c", [b"GoPro Max"]),
+ ]
+ device_id = gpmf_parser._find_first_device_id(stream)
+ assert device_id == 2**32
+
+ def test_empty_stream(self):
+ device_id = gpmf_parser._find_first_device_id([])
+ assert device_id == 2**32
+
+ def test_dvid_first_wins(self):
+ """If multiple DVID entries exist, the first one is used."""
+ stream = [
+ _make_klv(b"DVID", b"L", [[5]]),
+ _make_klv(b"DVID", b"L", [[10]]),
+ ]
+ assert gpmf_parser._find_first_device_id(stream) == 5
+
+
+# ---------------------------------------------------------------------------
+# 7. _find_first_gps_stream - prefers GPS9 over GPS5
+# ---------------------------------------------------------------------------
+class TestFindFirstGpsStream:
+ def test_gps5_stream_found(self):
+ gps5_strm = _make_klv(
+ b"STRM",
+ b"\x00",
+ [
+ _make_klv(
+ b"SCAL", b"l", [[10000000], [10000000], [1000], [1000], [100]]
+ ),
+ _make_klv(b"GPS5", b"l", [[473598318, 85227055, 414870, 891, 119]]),
+ _make_klv(b"GPSF", b"L", [[3]]),
+ ],
+ )
+ points = gpmf_parser._find_first_gps_stream([gps5_strm])
+ assert len(points) == 1
+ assert points[0].lat == pytest.approx(47.3598318, abs=1e-7)
+
+ def test_empty_stream(self):
+ points = gpmf_parser._find_first_gps_stream([])
+ assert points == []
+
+ def test_no_strm_key(self):
+ non_strm = _make_klv(b"DVNM", b"c", [b"GoPro Max"])
+ points = gpmf_parser._find_first_gps_stream([non_strm])
+ assert points == []
+
+ def test_gps9_preferred_over_gps5(self):
+ """GPS9 is tried first within each STRM; GPS5 is fallback."""
+ import struct
+
+ sample_bytes = struct.pack(
+ ">iiiiiiiHH",
+ 510776007,
+ 62268453,
+ 85454,
+ 295,
+ 42,
+ 9463,
+ 44896000,
+ 185,
+ 3,
+ )
+ gps9_strm = _make_klv(
+ b"STRM",
+ b"\x00",
+ [
+ _make_klv(b"TYPE", b"c", [b"lllllllSS"]),
+ _make_klv(
+ b"SCAL",
+ b"l",
+ [
+ [10000000],
+ [10000000],
+ [1000],
+ [1000],
+ [100],
+ [1],
+ [1000],
+ [100],
+ [1],
+ ],
+ ),
+ _make_klv(b"GPS9", b"?", [sample_bytes]),
+ ],
+ )
+ points = gpmf_parser._find_first_gps_stream([gps9_strm])
+ assert len(points) == 1
+ assert points[0].lat == pytest.approx(51.0776007, abs=1e-6)
+
+ def test_gps5_fallback_when_gps9_missing(self):
+ """Falls back to GPS5 if GPS9 is not found in stream."""
+ gps5_strm = _make_klv(
+ b"STRM",
+ b"\x00",
+ [
+ _make_klv(
+ b"SCAL", b"l", [[10000000], [10000000], [1000], [1000], [100]]
+ ),
+ _make_klv(b"GPS5", b"l", [[343985866, -1196986334, -34141, 313, 50]]),
+ _make_klv(b"GPSF", b"L", [[3]]),
+ ],
+ )
+ points = gpmf_parser._find_first_gps_stream([gps5_strm])
+ assert len(points) == 1
+ assert points[0].lat == pytest.approx(34.3985866, abs=1e-7)
+ assert points[0].lon == pytest.approx(-119.6986334, abs=1e-7)
+
+
+# ---------------------------------------------------------------------------
+# 8. _extract_camera_model_from_devices
+# ---------------------------------------------------------------------------
+class TestExtractCameraModelFromDevices:
+ def test_empty_device_names(self):
+ assert gpmf_parser._extract_camera_model_from_devices({}) == ""
+
+ def test_single_device(self):
+ result = gpmf_parser._extract_camera_model_from_devices({1: b"GoPro Max"})
+ assert result == "GoPro Max"
+
+ def test_hero_priority(self):
+ """Device with 'hero' in the name gets higher priority."""
+ result = gpmf_parser._extract_camera_model_from_devices(
+ {
+ 1: b"GoPro Max",
+ 2: b"Hero7 Black",
+ }
+ )
+ assert result == "Hero7 Black"
+
+ def test_gopro_priority_over_other(self):
+ """Device with 'gopro' gets priority if no 'hero' device."""
+ result = gpmf_parser._extract_camera_model_from_devices(
+ {
+ 1: b"SomeOtherCam",
+ 2: b"GoPro Fusion",
+ }
+ )
+ assert result == "GoPro Fusion"
+
+ def test_first_alphabetically_if_no_hero_or_gopro(self):
+ result = gpmf_parser._extract_camera_model_from_devices(
+ {
+ 1: b"ZCam",
+ 2: b"ACam",
+ }
+ )
+ assert result == "ACam"
+
+ def test_whitespace_stripped(self):
+ result = gpmf_parser._extract_camera_model_from_devices(
+ {
+ 1: b" GoPro Max ",
+ }
+ )
+ assert result == "GoPro Max"
+
+ def test_unicode_decode_error_skipped(self):
+ """Devices with invalid UTF-8 names are skipped."""
+ result = gpmf_parser._extract_camera_model_from_devices(
+ {
+ 1: b"\xff\xfe",
+ 2: b"GoPro Max",
+ }
+ )
+ assert result == "GoPro Max"
+
+ def test_all_invalid_unicode(self):
+ """All devices with invalid UTF-8 returns empty string."""
+ result = gpmf_parser._extract_camera_model_from_devices(
+ {
+ 1: b"\xff\xfe",
+ 2: b"\x80\x81",
+ }
+ )
+ assert result == ""
+
+ def test_hero_case_insensitive(self):
+ result = gpmf_parser._extract_camera_model_from_devices(
+ {
+ 1: b"MAX2",
+ 2: b"HERO12 Black",
+ }
+ )
+ assert result == "HERO12 Black"
+
+ def test_gopro_case_insensitive(self):
+ result = gpmf_parser._extract_camera_model_from_devices(
+ {
+ 1: b"SomeCam",
+ 2: b"GOPRO MAX",
+ }
+ )
+ assert result == "GOPRO MAX"
+
+ def test_real_max_model(self):
+ """GoPro Max has 'GoPro Max' device name (contains 'gopro', not 'hero')."""
+ result = gpmf_parser._extract_camera_model_from_devices({1: b"GoPro Max"})
+ assert result == "GoPro Max"
+
+ def test_real_max2_model(self):
+ """MAX2 has no 'hero' or 'gopro' in the name."""
+ result = gpmf_parser._extract_camera_model_from_devices({1: b"MAX2"})
+ assert result == "MAX2"
+
+
+# ---------------------------------------------------------------------------
+# 9. _backfill_gps_timestamps
+# ---------------------------------------------------------------------------
+class TestBackfillGpsTimestamps:
+ def _make_point(self, time, epoch_time=None):
+ return telemetry.GPSPoint(
+ time=time,
+ lat=47.36,
+ lon=8.52,
+ alt=400.0,
+ epoch_time=epoch_time,
+ fix=telemetry.GPSFix.FIX_3D,
+ precision=200,
+ ground_speed=1.0,
+ angle=None,
+ )
+
+ def test_all_have_epoch_time(self):
+ """No backfilling needed when all points have epoch_time."""
+ pts = [
+ self._make_point(0.0, 1000.0),
+ self._make_point(1.0, 1001.0),
+ self._make_point(2.0, 1002.0),
+ ]
+ gpmf_parser._backfill_gps_timestamps(pts)
+ assert pts[0].epoch_time == 1000.0
+ assert pts[1].epoch_time == 1001.0
+ assert pts[2].epoch_time == 1002.0
+
+ def test_backfill_forward(self):
+ """Points after the first with epoch_time get backfilled."""
+ pts = [
+ self._make_point(0.0, 1000.0),
+ self._make_point(1.0, None),
+ self._make_point(2.0, None),
+ ]
+ gpmf_parser._backfill_gps_timestamps(pts)
+ assert pts[0].epoch_time == 1000.0
+ assert pts[1].epoch_time == pytest.approx(1001.0)
+ assert pts[2].epoch_time == pytest.approx(1002.0)
+
+ def test_backfill_backward_with_reversed(self):
+ """Backfill backward by calling with reversed()."""
+ pts = [
+ self._make_point(0.0, None),
+ self._make_point(1.0, None),
+ self._make_point(2.0, 1002.0),
+ ]
+ gpmf_parser._backfill_gps_timestamps(reversed(pts))
+ assert pts[0].epoch_time == pytest.approx(1000.0)
+ assert pts[1].epoch_time == pytest.approx(1001.0)
+ assert pts[2].epoch_time == 1002.0
+
+ def test_no_points_with_epoch_time(self):
+ """No crash when no points have epoch_time."""
+ pts = [
+ self._make_point(0.0, None),
+ self._make_point(1.0, None),
+ ]
+ gpmf_parser._backfill_gps_timestamps(pts)
+ assert pts[0].epoch_time is None
+ assert pts[1].epoch_time is None
+
+ def test_empty_list(self):
+ """No crash on empty list."""
+ gpmf_parser._backfill_gps_timestamps([])
+
+ def test_single_point_with_epoch(self):
+ pts = [self._make_point(0.0, 1000.0)]
+ gpmf_parser._backfill_gps_timestamps(pts)
+ assert pts[0].epoch_time == 1000.0
+
+ def test_single_point_without_epoch(self):
+ pts = [self._make_point(0.0, None)]
+ gpmf_parser._backfill_gps_timestamps(pts)
+ assert pts[0].epoch_time is None
+
+ def test_middle_point_has_epoch(self):
+ """Only points after the first with epoch_time get filled (forward pass)."""
+ pts = [
+ self._make_point(0.0, None),
+ self._make_point(1.0, 1001.0),
+ self._make_point(2.0, None),
+ ]
+ gpmf_parser._backfill_gps_timestamps(pts)
+ # Forward pass: only point after 1001 gets filled
+ assert pts[0].epoch_time is None
+ assert pts[1].epoch_time == 1001.0
+ assert pts[2].epoch_time == pytest.approx(1002.0)
+
+ def test_preserves_existing_epoch_times(self):
+ """Points that already have epoch_time are not overwritten."""
+ pts = [
+ self._make_point(0.0, 1000.0),
+ self._make_point(1.0, None),
+ self._make_point(2.0, 1005.0), # intentionally different
+ ]
+ gpmf_parser._backfill_gps_timestamps(pts)
+ assert pts[2].epoch_time == 1005.0 # preserved, not overwritten
+
+
+# ---------------------------------------------------------------------------
+# 10. _build_matrix, _apply_matrix, _is_matrix_calibration
+# ---------------------------------------------------------------------------
+class TestMatrixOperations:
+ def test_is_matrix_calibration_identity_like(self):
+ """A matrix with only 0, 1, -1 values is NOT calibration."""
+ assert gpmf_parser._is_matrix_calibration([1, 0, 0, 0, -1, 0, 0, 0, 1]) is False
+
+ def test_is_matrix_calibration_actual(self):
+ """A matrix with non-trivial values IS calibration."""
+ assert (
+ gpmf_parser._is_matrix_calibration([1.5, 0, 0, 0, -1, 0, 0, 0, 1]) is True
+ )
+
+ def test_is_matrix_calibration_all_zeros(self):
+ assert gpmf_parser._is_matrix_calibration([0, 0, 0, 0, 0, 0, 0, 0, 0]) is False
+
+ def test_build_matrix_identity(self):
+ """ORIN='XYZ' ORIO='XYZ' should produce identity."""
+ matrix = gpmf_parser._build_matrix(b"XYZ", b"XYZ")
+ assert matrix == [1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0]
+
+ def test_build_matrix_swap_axes(self):
+ """ORIN='YxZ' ORIO='ZXY' swaps and negates axes."""
+ matrix = gpmf_parser._build_matrix(b"YxZ", b"ZXY")
+ # Y -> ZXY: Y matches at index 2 -> [0, 0, 1]
+ # x (lowercase) -> negate X -> [0, -1, 0]
+ # Z -> ZXY: Z matches at index 0 -> [1, 0, 0]
+ assert matrix == [0.0, 0.0, 1.0, 0.0, -1.0, 0.0, 1.0, 0.0, 0.0]
+
+ def test_build_matrix_negate_all(self):
+ """ORIN='xyz' ORIO='XYZ' should produce -1 on diagonal."""
+ matrix = gpmf_parser._build_matrix(b"xyz", b"XYZ")
+ assert matrix == [-1.0, 0.0, 0.0, 0.0, -1.0, 0.0, 0.0, 0.0, -1.0]
+
+ def test_apply_matrix_identity(self):
+ identity = [1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0]
+ values = [10.0, 20.0, 30.0]
+ result = list(gpmf_parser._apply_matrix(identity, values))
+ assert result == [10.0, 20.0, 30.0]
+
+ def test_apply_matrix_negate_y(self):
+ matrix = [1.0, 0.0, 0.0, 0.0, -1.0, 0.0, 0.0, 0.0, 1.0]
+ values = [10.0, 20.0, 30.0]
+ result = list(gpmf_parser._apply_matrix(matrix, values))
+ assert result == [10.0, -20.0, 30.0]
+
+ def test_apply_matrix_swap(self):
+ matrix = [0.0, 0.0, 1.0, 0.0, -1.0, 0.0, 1.0, 0.0, 0.0]
+ values = [10.0, 20.0, 30.0]
+ result = list(gpmf_parser._apply_matrix(matrix, values))
+ assert result == [30.0, -20.0, 10.0]
+
+ def test_apply_matrix_wrong_size_asserts(self):
+ matrix = [1.0, 0.0, 0.0, 0.0, 1.0, 0.0] # 6 elements, not 9
+ with pytest.raises(AssertionError, match="square matrix"):
+ list(gpmf_parser._apply_matrix(matrix, [10.0, 20.0, 30.0]))
+
+ def test_apply_matrix_2d(self):
+ """2x2 matrix multiplication."""
+ matrix = [0.0, 1.0, 1.0, 0.0] # swap
+ values = [3.0, 7.0]
+ result = list(gpmf_parser._apply_matrix(matrix, values))
+ assert result == [7.0, 3.0]
+
+
+# ---------------------------------------------------------------------------
+# 11. _scale_and_calibrate
+# ---------------------------------------------------------------------------
+class TestScaleAndCalibrate:
+ def test_basic_scaling(self):
+ stream = [
+ _make_klv(b"SCAL", b"s", [[100], [200], [300]]),
+ _make_klv(b"ACCL", b"s", [[1000, 2000, 3000], [500, 600, 900]]),
+ ]
+ results = list(gpmf_parser._scale_and_calibrate(stream, b"ACCL"))
+ assert len(results) == 2
+ assert results[0] == pytest.approx((10.0, 10.0, 10.0))
+ assert results[1] == pytest.approx((5.0, 3.0, 3.0))
+
+ def test_single_scale_repeated(self):
+ """Single SCAL value is repeated for all elements."""
+ stream = [
+ _make_klv(b"SCAL", b"s", [[100]]),
+ _make_klv(b"GYRO", b"s", [[200, 400, 600]]),
+ ]
+ results = list(gpmf_parser._scale_and_calibrate(stream, b"GYRO"))
+ assert len(results) == 1
+ assert results[0] == pytest.approx((2.0, 4.0, 6.0))
+
+ def test_missing_key_returns_empty(self):
+ stream = [
+ _make_klv(b"SCAL", b"s", [[100]]),
+ _make_klv(b"ACCL", b"s", [[200, 300, 400]]),
+ ]
+ results = list(gpmf_parser._scale_and_calibrate(stream, b"GYRO"))
+ assert results == []
+
+ def test_with_orin_orio_matrix(self):
+ """Matrix from ORIN/ORIO applied to scaled values."""
+ stream = [
+ _make_klv(b"SCAL", b"s", [[100], [100], [100]]),
+ _make_klv(b"ACCL", b"s", [[1000, 2000, 3000]]),
+ _make_klv(b"ORIN", b"c", [b"Y", b"x", b"Z"]),
+ _make_klv(b"ORIO", b"c", [b"Z", b"X", b"Y"]),
+ ]
+ results = list(gpmf_parser._scale_and_calibrate(stream, b"ACCL"))
+ assert len(results) == 1
+ # ORIN=YxZ, ORIO=ZXY -> matrix=[0,0,1, 0,-1,0, 1,0,0]
+ # scaled = [10, 20, 30]
+ # matrix * scaled = [30, -20, 10]
+ assert results[0] == pytest.approx((30.0, -20.0, 10.0))
+
+ def test_zero_scal_replaced_with_one(self):
+ """Zero SCAL values should be replaced with 1 to avoid division by zero."""
+ stream = [
+ _make_klv(b"SCAL", b"s", [[0], [100], [0]]),
+ _make_klv(b"ACCL", b"s", [[500, 1000, 300]]),
+ ]
+ results = list(gpmf_parser._scale_and_calibrate(stream, b"ACCL"))
+ assert len(results) == 1
+ # 0 replaced with 1, so: 500/1=500, 1000/100=10, 300/1=300
+ assert results[0] == pytest.approx((500.0, 10.0, 300.0))
+
+
+# ---------------------------------------------------------------------------
+# 12. KLV parsing basics (existing test expanded)
+# ---------------------------------------------------------------------------
+class TestKLVParsing:
+ def test_simple_klv(self):
+ x = gpmf_parser.KLV.parse(b"DEMO\x02\x01\x00\x01\xff\x00\x00\x00")
+ assert x["key"] == b"DEMO"
+
+ def test_gpmf_sample_data(self):
+ x = gpmf_parser.GPMFSampleData.parse(
+ b"DEM1\x01\x01\x00\x01\xff\x00\x00\x00DEM2\x03\x00\x00\x01"
+ )
+ assert len(x) == 2
+ assert x[0]["key"] == b"DEM1"
+ assert x[1]["key"] == b"DEM2"
+
+
+# ---------------------------------------------------------------------------
+# 13. GoProInfo dataclass defaults
+# ---------------------------------------------------------------------------
+class TestGoProInfo:
+ def test_defaults(self):
+ info = gpmf_parser.GoProInfo()
+ assert info.gps is None
+ assert info.accl is None
+ assert info.gyro is None
+ assert info.magn is None
+ assert info.make == "GoPro"
+ assert info.model == ""
+
+
+# ============================================================================
+# INTEGRATION TESTS (require real test data files)
+# ============================================================================
+
+
+# ---------------------------------------------------------------------------
+# 14. extract_gopro_info with GPS5 file (GoPro MAX)
+# ---------------------------------------------------------------------------
+@pytest.mark.skipif(not _has_gps5_video, reason="GPS5 test data not available")
+class TestExtractGoProInfoGPS5:
+ def test_basic_extraction(self):
+ with open(GPS5_VIDEO, "rb") as f:
+ info = gpmf_parser.extract_gopro_info(f)
+ assert info is not None
+ assert info.make == "GoPro"
+ assert info.model == "GoPro Max"
+
+ def test_gps_count(self):
+ with open(GPS5_VIDEO, "rb") as f:
+ info = gpmf_parser.extract_gopro_info(f)
+ assert info.gps is not None
+ assert len(info.gps) == 1737
+
+ def test_first_gps_point(self):
+ with open(GPS5_VIDEO, "rb") as f:
+ info = gpmf_parser.extract_gopro_info(f)
+ p = info.gps[0]
+ assert p.lat == pytest.approx(47.3598318, abs=1e-6)
+ assert p.lon == pytest.approx(8.5227055, abs=1e-6)
+ assert p.alt == pytest.approx(414.87, abs=0.1)
+ assert p.fix == telemetry.GPSFix.FIX_3D
+ assert p.precision == 219
+ assert p.ground_speed == pytest.approx(0.891, abs=0.01)
+
+ def test_epoch_time_backfilled(self):
+ """All GPS points should have epoch_time after backfilling."""
+ with open(GPS5_VIDEO, "rb") as f:
+ info = gpmf_parser.extract_gopro_info(f)
+ for p in info.gps:
+ assert p.epoch_time is not None
+
+ def test_telemetry_not_extracted_by_default(self):
+ """By default (telemetry_only=False), ACCL/GYRO/MAGN are None."""
+ with open(GPS5_VIDEO, "rb") as f:
+ info = gpmf_parser.extract_gopro_info(f)
+ assert info.accl is None
+ assert info.gyro is None
+ assert info.magn is None
+
+
+# ---------------------------------------------------------------------------
+# 15. extract_gopro_info with GPS9 file (GoPro MAX 2)
+# ---------------------------------------------------------------------------
+@pytest.mark.skipif(not _has_gps9_video, reason="GPS9 test data not available")
+class TestExtractGoProInfoGPS9:
+ def test_basic_extraction(self):
+ with open(GPS9_VIDEO, "rb") as f:
+ info = gpmf_parser.extract_gopro_info(f)
+ assert info is not None
+ assert info.make == "GoPro"
+ assert info.model == "MAX2"
+
+ def test_gps_count(self):
+ with open(GPS9_VIDEO, "rb") as f:
+ info = gpmf_parser.extract_gopro_info(f)
+ assert info.gps is not None
+ assert len(info.gps) == 267
+
+ def test_first_gps_point(self):
+ with open(GPS9_VIDEO, "rb") as f:
+ info = gpmf_parser.extract_gopro_info(f)
+ p = info.gps[0]
+ assert p.lat == pytest.approx(51.0776007, abs=1e-5)
+ assert p.lon == pytest.approx(6.2268453, abs=1e-5)
+ assert p.alt == pytest.approx(85.454, abs=0.1)
+ assert p.fix == telemetry.GPSFix.FIX_3D
+ assert p.precision == pytest.approx(185.0, abs=1.0)
+
+ def test_epoch_time_backfilled(self):
+ with open(GPS9_VIDEO, "rb") as f:
+ info = gpmf_parser.extract_gopro_info(f)
+ for p in info.gps:
+ assert p.epoch_time is not None
+
+
+# ---------------------------------------------------------------------------
+# 16. extract_gopro_info with telemetry_only mode
+# ---------------------------------------------------------------------------
+@pytest.mark.skipif(not _has_hero7_video, reason="Hero7 test data not available")
+class TestExtractGoProInfoTelemetryOnly:
+ def test_telemetry_only_no_gps(self):
+ """In telemetry_only mode, GPS is None."""
+ with open(HERO7_VIDEO, "rb") as f:
+ info = gpmf_parser.extract_gopro_info(f, telemetry_only=True)
+ assert info is not None
+ assert info.gps is None
+
+ def test_telemetry_only_no_model(self):
+ """In telemetry_only mode, model is not extracted."""
+ with open(HERO7_VIDEO, "rb") as f:
+ info = gpmf_parser.extract_gopro_info(f, telemetry_only=True)
+ assert info.model == ""
+
+ def test_telemetry_only_has_accl(self):
+ with open(HERO7_VIDEO, "rb") as f:
+ info = gpmf_parser.extract_gopro_info(f, telemetry_only=True)
+ assert info.accl is not None
+ assert len(info.accl) == 103806
+
+ def test_telemetry_only_has_gyro(self):
+ with open(HERO7_VIDEO, "rb") as f:
+ info = gpmf_parser.extract_gopro_info(f, telemetry_only=True)
+ assert info.gyro is not None
+ assert len(info.gyro) == 103806
+
+ def test_telemetry_only_first_accl_values(self):
+ with open(HERO7_VIDEO, "rb") as f:
+ info = gpmf_parser.extract_gopro_info(f, telemetry_only=True)
+ a = info.accl[0]
+ assert a.x == pytest.approx(1.672, abs=0.01)
+ assert a.y == pytest.approx(5.175, abs=0.01)
+ assert a.z == pytest.approx(11.022, abs=0.01)
+
+ def test_telemetry_only_first_gyro_values(self):
+ with open(HERO7_VIDEO, "rb") as f:
+ info = gpmf_parser.extract_gopro_info(f, telemetry_only=True)
+ g = info.gyro[0]
+ assert g.x == pytest.approx(0.121, abs=0.01)
+ assert g.y == pytest.approx(0.896, abs=0.01)
+ assert g.z == pytest.approx(0.165, abs=0.01)
+
+
+# ---------------------------------------------------------------------------
+# 17. extract_gopro_info normal mode with Hero 7
+# ---------------------------------------------------------------------------
+@pytest.mark.skipif(not _has_hero7_video, reason="Hero7 test data not available")
+class TestExtractGoProInfoHero7:
+ def test_model_is_hero7(self):
+ with open(HERO7_VIDEO, "rb") as f:
+ info = gpmf_parser.extract_gopro_info(f)
+ assert info is not None
+ assert info.model == "Hero7 Black"
+
+ def test_gps_count(self):
+ with open(HERO7_VIDEO, "rb") as f:
+ info = gpmf_parser.extract_gopro_info(f)
+ assert info.gps is not None
+ assert len(info.gps) == 2435
+
+ def test_first_gps_point(self):
+ with open(HERO7_VIDEO, "rb") as f:
+ info = gpmf_parser.extract_gopro_info(f)
+ p = info.gps[0]
+ assert p.lat == pytest.approx(34.3985866, abs=1e-5)
+ assert p.lon == pytest.approx(-119.6986334, abs=1e-5)
+ assert p.alt == pytest.approx(-34.141, abs=0.1)
+ assert p.fix == telemetry.GPSFix.FIX_3D
+ assert p.ground_speed == pytest.approx(0.313, abs=0.01)
+
+
+# ---------------------------------------------------------------------------
+# 18. _flatten helper
+# ---------------------------------------------------------------------------
+class TestFlatten:
+ def test_basic(self):
+ assert gpmf_parser._flatten([[1, 2], [3, 4]]) == [1, 2, 3, 4]
+
+ def test_single_row(self):
+ assert gpmf_parser._flatten([[5, 6, 7]]) == [5, 6, 7]
+
+ def test_empty(self):
+ assert gpmf_parser._flatten([]) == []
diff --git a/tests/unit/test_sample_video.py b/tests/unit/test_sample_video.py
index f9c076cb..e92aef4a 100644
--- a/tests/unit/test_sample_video.py
+++ b/tests/unit/test_sample_video.py
@@ -11,16 +11,31 @@
import shutil
import typing as T
from pathlib import Path
+from unittest import mock
import py.path
import pytest
-from mapillary_tools import exif_read, ffmpeg, sample_video
+
+from mapillary_tools import (
+ exceptions,
+ exif_read,
+ ffmpeg as ffmpeglib,
+ geo,
+ sample_video,
+)
+from mapillary_tools.mp4 import mp4_sample_parser
from mapillary_tools.serializer import description
+from mapillary_tools.types import FileType, VideoMetadata
_PWD = Path(os.path.dirname(os.path.abspath(__file__)))
-class MOCK_FFMPEG(ffmpeg.FFMPEG):
+# ---------------------------------------------------------------------------
+# Interval-based sampling tests (using MOCK_FFMPEG)
+# ---------------------------------------------------------------------------
+
+
+class MOCK_FFMPEG(ffmpeglib.FFMPEG):
def extract_frames_by_interval(
self,
video_path: Path,
@@ -40,14 +55,14 @@ def extract_frames_by_interval(
sample = f"{frame_path_prefix}_{stream_specifier}_{idx + 1:06d}.jpg"
shutil.copyfile(src, sample)
- def probe_format_and_streams(self, video_path: Path) -> ffmpeg.ProbeOutput:
+ def probe_format_and_streams(self, video_path: Path) -> ffmpeglib.ProbeOutput:
with open(video_path) as fp:
return json.load(fp)
@pytest.fixture
def setup_mock(monkeypatch):
- monkeypatch.setattr(ffmpeg, "FFMPEG", MOCK_FFMPEG)
+ monkeypatch.setattr(ffmpeglib, "FFMPEG", MOCK_FFMPEG)
def _validate_interval(samples: T.Sequence[Path], video_start_time):
@@ -107,3 +122,490 @@ def test_sample_video_with_start_time(tmpdir: py.path.local, setup_mock):
)
samples = sample_dir.join("hello.mp4").listdir()
_validate_interval([Path(s) for s in samples], video_start_time)
+
+
+# ---------------------------------------------------------------------------
+# Helpers for distance-based sampling tests
+# ---------------------------------------------------------------------------
+
+MOCK_PROBE_JSON = _PWD / "data" / "mock_sample_video" / "videos" / "hello.mp4"
+TEST_EXIF_JPG = _PWD / "data" / "test_exif.jpg"
+
+# Start time derived from the hello.mp4 probe fixture:
+# creation_time "2021-08-10T14:38:06.000000Z" - duration "60.977000"
+PROBE_START_TIME = datetime.datetime(
+ 2021, 8, 10, 14, 36, 55, 23000, tzinfo=datetime.timezone.utc
+)
+
+
+def _load_probe_output() -> ffmpeglib.ProbeOutput:
+ with open(MOCK_PROBE_JSON) as fp:
+ return T.cast(ffmpeglib.ProbeOutput, json.load(fp))
+
+
+def _make_gps_points(
+ n: int = 10,
+ start_lat: float = 40.0,
+ start_lon: float = -74.0,
+ lat_step: float = 0.001,
+ lon_step: float = 0.001,
+ time_step: float = 1.0,
+) -> list[geo.Point]:
+ """Create a synthetic GPS track with n points."""
+ return [
+ geo.Point(
+ time=i * time_step,
+ lat=start_lat + i * lat_step,
+ lon=start_lon + i * lon_step,
+ alt=10.0,
+ angle=45.0,
+ )
+ for i in range(n)
+ ]
+
+
+def _make_sample(
+ composition_time: float,
+ timedelta: float = 0.033,
+) -> mp4_sample_parser.Sample:
+ """Create a synthetic mp4 Sample at the given composition time."""
+ raw = mp4_sample_parser.RawSample(
+ description_idx=1,
+ offset=0,
+ size=1000,
+ timedelta=int(timedelta * 1000),
+ composition_offset=0,
+ is_sync=True,
+ )
+ return mp4_sample_parser.Sample(
+ raw_sample=raw,
+ exact_time=composition_time,
+ exact_composition_time=composition_time,
+ exact_timedelta=timedelta,
+ description={},
+ )
+
+
+def _create_fake_frames(
+ sample_dir: Path,
+ video_stem: str,
+ stream_specifier: str,
+ num_frames: int,
+) -> list[Path]:
+ """Create fake JPEG frame files in sample_dir mimicking ffmpeg output."""
+ os.makedirs(sample_dir, exist_ok=True)
+ paths: list[Path] = []
+ for i in range(1, num_frames + 1):
+ name = f"{video_stem}_{stream_specifier}_{i:06d}.jpg"
+ frame_path = sample_dir / name
+ shutil.copy(str(TEST_EXIF_JPG), str(frame_path))
+ paths.append(frame_path)
+ return paths
+
+
+# ---------------------------------------------------------------------------
+# Distance-based sampling: _within_track_time_range_buffered
+# ---------------------------------------------------------------------------
+
+
+class TestWithinTrackTimeRangeBuffered:
+ """Tests for _within_track_time_range_buffered."""
+
+ def test_within_range(self) -> None:
+ points = _make_gps_points(5, time_step=1.0)
+ assert sample_video._within_track_time_range_buffered(points, 2.0) is True
+
+ def test_at_start_boundary(self) -> None:
+ points = _make_gps_points(5, time_step=1.0)
+ assert sample_video._within_track_time_range_buffered(points, 0.0) is True
+
+ def test_at_end_boundary(self) -> None:
+ points = _make_gps_points(5, time_step=1.0)
+ assert sample_video._within_track_time_range_buffered(points, 4.0) is True
+
+ def test_within_1ms_buffer_before_start(self) -> None:
+ points = _make_gps_points(5, time_step=1.0)
+ assert sample_video._within_track_time_range_buffered(points, -0.0005) is True
+
+ def test_within_1ms_buffer_after_end(self) -> None:
+ points = _make_gps_points(5, time_step=1.0)
+ assert sample_video._within_track_time_range_buffered(points, 4.0005) is True
+
+ def test_outside_buffer_before_start(self) -> None:
+ points = _make_gps_points(5, time_step=1.0)
+ assert sample_video._within_track_time_range_buffered(points, -0.002) is False
+
+ def test_outside_buffer_after_end(self) -> None:
+ points = _make_gps_points(5, time_step=1.0)
+ assert sample_video._within_track_time_range_buffered(points, 4.002) is False
+
+ def test_exactly_at_1ms_boundary(self) -> None:
+ points = _make_gps_points(5, time_step=1.0)
+ assert sample_video._within_track_time_range_buffered(points, -0.001) is True
+ assert sample_video._within_track_time_range_buffered(points, 4.001) is True
+
+
+# ---------------------------------------------------------------------------
+# Distance-based sampling: _sample_video_stream_by_distance
+# ---------------------------------------------------------------------------
+
+
+class TestSampleVideoStreamByDistance:
+ """Tests for _sample_video_stream_by_distance."""
+
+ def test_selects_frames_by_distance(self) -> None:
+ """Frames spaced farther than sample_distance should be selected."""
+ points = _make_gps_points(10, lat_step=0.001, time_step=1.0)
+ samples = [_make_sample(float(i)) for i in range(10)]
+
+ mock_parser = mock.MagicMock(spec=mp4_sample_parser.TrackBoxParser)
+ mock_parser.extract_samples.return_value = iter(samples)
+
+ result = sample_video._sample_video_stream_by_distance(
+ points, mock_parser, sample_distance=50.0
+ )
+
+ # Each point is ~111m apart in lat, so all 10 should be selected
+ assert len(result) == 10
+ assert all(idx in result for idx in range(10))
+
+ def test_filters_close_frames(self) -> None:
+ """Frames closer than sample_distance should be filtered out."""
+ # ~15m apart (0.0001 degree in each axis)
+ points = _make_gps_points(10, lat_step=0.0001, lon_step=0.0001, time_step=1.0)
+ samples = [_make_sample(float(i)) for i in range(10)]
+
+ mock_parser = mock.MagicMock(spec=mp4_sample_parser.TrackBoxParser)
+ mock_parser.extract_samples.return_value = iter(samples)
+
+ result = sample_video._sample_video_stream_by_distance(
+ points, mock_parser, sample_distance=50.0
+ )
+
+ assert len(result) < 10
+ assert 0 in result # first frame is always selected
+
+ def test_zero_distance_selects_all(self) -> None:
+ """With sample_distance=0, all frames in range should be selected."""
+ points = _make_gps_points(5, time_step=1.0)
+ samples = [_make_sample(float(i)) for i in range(5)]
+
+ mock_parser = mock.MagicMock(spec=mp4_sample_parser.TrackBoxParser)
+ mock_parser.extract_samples.return_value = iter(samples)
+
+ result = sample_video._sample_video_stream_by_distance(
+ points, mock_parser, sample_distance=0.0
+ )
+
+ assert len(result) == 5
+
+ def test_frames_outside_track_range_excluded(self) -> None:
+ """Frames outside the GPS track time range should not be selected."""
+ # GPS track covers t=2..6
+ points = [
+ geo.Point(time=p.time + 2.0, lat=p.lat, lon=p.lon, alt=p.alt, angle=p.angle)
+ for p in _make_gps_points(5, time_step=1.0)
+ ]
+
+ # Samples at t=0..9 — only t=2..6 should be interpolated
+ samples = [_make_sample(float(i)) for i in range(10)]
+
+ mock_parser = mock.MagicMock(spec=mp4_sample_parser.TrackBoxParser)
+ mock_parser.extract_samples.return_value = iter(samples)
+
+ result = sample_video._sample_video_stream_by_distance(
+ points, mock_parser, sample_distance=0.0
+ )
+
+ for idx in result:
+ sample_time = samples[idx].exact_composition_time
+ assert 1.999 <= sample_time <= 6.001
+
+ def test_empty_samples(self) -> None:
+ """Empty video track should produce no selected frames."""
+ points = _make_gps_points(5, time_step=1.0)
+
+ mock_parser = mock.MagicMock(spec=mp4_sample_parser.TrackBoxParser)
+ mock_parser.extract_samples.return_value = iter([])
+
+ result = sample_video._sample_video_stream_by_distance(
+ points, mock_parser, sample_distance=3.0
+ )
+
+ assert len(result) == 0
+
+
+# ---------------------------------------------------------------------------
+# sample_video() parameter validation & rerun
+# ---------------------------------------------------------------------------
+
+
+class TestSampleVideoNegativeDistance:
+ """Test sample_video() with invalid parameters."""
+
+ def test_negative_distance_raises(self, tmp_path: Path) -> None:
+ video_dir = tmp_path / "videos"
+ video_dir.mkdir()
+ (video_dir / "test.mp4").touch()
+
+ with pytest.raises(exceptions.MapillaryBadParameterError):
+ sample_video.sample_video(
+ video_import_path=video_dir,
+ import_path=tmp_path / "output",
+ video_sample_distance=1.0,
+ video_sample_interval=1.0,
+ )
+
+
+class TestSampleVideoRerun:
+ """Test rerun behavior of sample_video."""
+
+ def test_skip_existing_samples_without_rerun(self, tmp_path: Path) -> None:
+ """Existing sample directories should be skipped without --rerun."""
+ video_dir = tmp_path / "videos"
+ video_dir.mkdir()
+ (video_dir / "test.mp4").touch()
+
+ output_dir = tmp_path / "output"
+ sample_dir = output_dir / "test.mp4"
+ sample_dir.mkdir(parents=True)
+ (sample_dir / "frame_000001.jpg").touch()
+
+ with mock.patch.object(
+ sample_video, "_sample_single_video_by_distance"
+ ) as mock_sample:
+ sample_video.sample_video(
+ video_import_path=video_dir,
+ import_path=output_dir,
+ rerun=False,
+ )
+ mock_sample.assert_not_called()
+
+ def test_rerun_removes_existing_and_resamples(self, tmp_path: Path) -> None:
+ """With --rerun, existing sample directories should be removed."""
+ video_dir = tmp_path / "videos"
+ video_dir.mkdir()
+ (video_dir / "test.mp4").touch()
+
+ output_dir = tmp_path / "output"
+ sample_dir = output_dir / "test.mp4"
+ sample_dir.mkdir(parents=True)
+ marker = sample_dir / "old_frame.jpg"
+ marker.touch()
+
+ with mock.patch.object(
+ sample_video, "_sample_single_video_by_distance"
+ ) as mock_sample:
+ sample_video.sample_video(
+ video_import_path=video_dir,
+ import_path=output_dir,
+ rerun=True,
+ )
+ assert not marker.exists()
+ mock_sample.assert_called_once()
+
+
+# ---------------------------------------------------------------------------
+# Distance-based sampling: integration tests with mocked ffmpeg and geotag
+# ---------------------------------------------------------------------------
+
+
+class TestSampleVideoDistanceIntegration:
+ """Integration-style tests for the distance-based sampling path."""
+
+ def _setup_mocks(
+ self,
+ tmp_path: Path,
+ video_path: Path,
+ num_gps_points: int = 10,
+ ) -> dict[str, T.Any]:
+ """Set up all the mocks needed for _sample_single_video_by_distance."""
+ probe_output = _load_probe_output()
+ gps_points = _make_gps_points(num_gps_points, time_step=1.0)
+
+ video_metadata = VideoMetadata(
+ filename=video_path,
+ filetype=FileType.CAMM,
+ points=gps_points,
+ make="TestMake",
+ model="TestModel",
+ )
+
+ video_samples = [_make_sample(float(i)) for i in range(num_gps_points)]
+
+ mock_track_parser = mock.MagicMock(spec=mp4_sample_parser.TrackBoxParser)
+ mock_track_parser.extract_samples.return_value = iter(video_samples)
+
+ mock_moov_parser = mock.MagicMock(spec=mp4_sample_parser.MovieBoxParser)
+ mock_moov_parser.extract_track_at.return_value = mock_track_parser
+
+ patches = {}
+
+ # Mock FFMPEG: instance methods are mocked, classmethods delegate to real
+ def fake_extract_frames(
+ video_path: Path,
+ sample_dir: Path,
+ frame_indices: set[int],
+ stream_specifier: str = "v",
+ ) -> None:
+ _create_fake_frames(
+ sample_dir,
+ video_path.stem,
+ stream_specifier,
+ len(frame_indices),
+ )
+
+ mock_ffmpeg_instance = mock.MagicMock(spec=ffmpeglib.FFMPEG)
+ mock_ffmpeg_instance.probe_format_and_streams.return_value = probe_output
+ mock_ffmpeg_instance.extract_specified_frames.side_effect = fake_extract_frames
+
+ mock_ffmpeg_class = mock.MagicMock()
+ mock_ffmpeg_class.return_value = mock_ffmpeg_instance
+ mock_ffmpeg_class.sort_selected_samples = ffmpeglib.FFMPEG.sort_selected_samples
+ mock_ffmpeg_class.iterate_samples = ffmpeglib.FFMPEG.iterate_samples
+ mock_ffmpeg_class._extract_stream_frame_idx = (
+ ffmpeglib.FFMPEG._extract_stream_frame_idx
+ )
+ mock_ffmpeg_class._validate_stream_specifier = (
+ ffmpeglib.FFMPEG._validate_stream_specifier
+ )
+ mock_ffmpeg_class.FRAME_EXT = ffmpeglib.FFMPEG.FRAME_EXT
+
+ patches["ffmpeg_cls"] = mock.patch(
+ "mapillary_tools.sample_video.ffmpeglib.FFMPEG",
+ mock_ffmpeg_class,
+ )
+
+ mock_geotag_instance = mock.MagicMock()
+ mock_geotag_instance.to_description.return_value = [video_metadata]
+ patches["geotag_cls"] = mock.patch(
+ "mapillary_tools.sample_video.geotag_videos_from_video.GeotagVideosFromVideo",
+ return_value=mock_geotag_instance,
+ )
+
+ patches["moov_parse"] = mock.patch.object(
+ mp4_sample_parser.MovieBoxParser,
+ "parse_file",
+ return_value=mock_moov_parser,
+ )
+
+ return {
+ "patches": patches,
+ "gps_points": gps_points,
+ "video_metadata": video_metadata,
+ }
+
+ def test_single_video_file(self, tmp_path: Path) -> None:
+ """sample_video with a single video file produces sample frames."""
+ video_dir = tmp_path / "videos"
+ video_dir.mkdir()
+ video_file = video_dir / "test.mp4"
+ video_file.touch()
+ output_dir = tmp_path / "output"
+
+ mocks = self._setup_mocks(tmp_path, video_file)
+
+ with (
+ mocks["patches"]["ffmpeg_cls"],
+ mocks["patches"]["geotag_cls"],
+ mocks["patches"]["moov_parse"],
+ ):
+ sample_video.sample_video(
+ video_import_path=video_file,
+ import_path=output_dir,
+ video_sample_distance=0.0,
+ )
+
+ sample_dir = output_dir / "test.mp4"
+ assert sample_dir.is_dir()
+
+ frames = list(sample_dir.glob("*.jpg"))
+ assert len(frames) > 0
+
+ exif = exif_read.ExifRead(frames[0])
+ assert exif.extract_lon_lat() is not None
+ assert exif.extract_capture_time() is not None
+
+ def test_video_directory(self, tmp_path: Path) -> None:
+ """sample_video with a directory processes all videos."""
+ video_dir = tmp_path / "videos"
+ video_dir.mkdir()
+ (video_dir / "clip1.mp4").touch()
+ (video_dir / "clip2.mp4").touch()
+ output_dir = tmp_path / "output"
+
+ with mock.patch.object(
+ sample_video, "_sample_single_video_by_distance"
+ ) as mock_sample:
+ sample_video.sample_video(
+ video_import_path=video_dir,
+ import_path=output_dir,
+ video_sample_distance=3.0,
+ )
+ assert mock_sample.call_count == 2
+
+ def test_custom_start_time(self, tmp_path: Path) -> None:
+ """sample_video with video_start_time override uses the given time."""
+ video_dir = tmp_path / "videos"
+ video_dir.mkdir()
+ video_file = video_dir / "test.mp4"
+ video_file.touch()
+ output_dir = tmp_path / "output"
+
+ mocks = self._setup_mocks(tmp_path, video_file)
+
+ with (
+ mocks["patches"]["ffmpeg_cls"],
+ mocks["patches"]["geotag_cls"],
+ mocks["patches"]["moov_parse"],
+ ):
+ sample_video.sample_video(
+ video_import_path=video_file,
+ import_path=output_dir,
+ video_sample_distance=0.0,
+ video_start_time="2023_06_15_12_00_00_000",
+ )
+
+ sample_dir = output_dir / "test.mp4"
+ frames = list(sample_dir.glob("*.jpg"))
+ assert len(frames) > 0
+
+ exif = exif_read.ExifRead(frames[0])
+ capture_time = exif.extract_capture_time()
+ assert capture_time is not None
+ assert capture_time.year == 2023
+ assert capture_time.month == 6
+ assert capture_time.day == 15
+
+ def test_exif_lat_lon_written(self, tmp_path: Path) -> None:
+ """Verify GPS coordinates are written into EXIF of sampled frames."""
+ video_dir = tmp_path / "videos"
+ video_dir.mkdir()
+ video_file = video_dir / "test.mp4"
+ video_file.touch()
+ output_dir = tmp_path / "output"
+
+ mocks = self._setup_mocks(tmp_path, video_file)
+
+ with (
+ mocks["patches"]["ffmpeg_cls"],
+ mocks["patches"]["geotag_cls"],
+ mocks["patches"]["moov_parse"],
+ ):
+ sample_video.sample_video(
+ video_import_path=video_file,
+ import_path=output_dir,
+ video_sample_distance=0.0,
+ )
+
+ sample_dir = output_dir / "test.mp4"
+ frames = sorted(sample_dir.glob("*.jpg"))
+ assert len(frames) > 0
+
+ exif = exif_read.ExifRead(frames[0])
+ lon_lat = exif.extract_lon_lat()
+ assert lon_lat is not None
+ lon, lat = lon_lat
+ # First GPS point is at (40.0, -74.0)
+ assert abs(lat - 40.0) < 0.01
+ assert abs(lon - (-74.0)) < 0.01